Skip to content

Commit a0b02ce

Browse files
committed
Move top-level pipeline aggs out of QuerySearchResult (#40319)
As part of #40177 we have added top-level pipeline aggs to `InternalAggregations`. Given that `QuerySearchResult` holds an `InternalAggregations` instance, there is no need to keep on setting top-level pipeline aggs separately. Top-level pipeline aggs can then always be transported through `InternalAggregations`. Such change is made in a backwards compatible manner.
1 parent f6eefd4 commit a0b02ce

File tree

6 files changed

+174
-39
lines changed

6 files changed

+174
-39
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
486486
}
487487
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
488488
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
489-
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
489+
InternalAggregations.reduce(aggregationsList, reduceContext);
490490
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
491491
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
492492
reducedCompletionSuggestions);

server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ public void execute(SearchContext context) {
132132
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
133133
}
134134
}
135-
context.queryResult().aggregations(new InternalAggregations(aggregations));
136135
List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
137136
List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
138137
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
@@ -144,7 +143,7 @@ public void execute(SearchContext context) {
144143
+ "allowed at the top level");
145144
}
146145
}
147-
context.queryResult().pipelineAggregators(siblingPipelineAggregators);
146+
context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));
148147

149148
// disable aggregations so that they don't run on next pages in case of scrolling
150149
context.aggregations(null);

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public InternalAggregations(List<InternalAggregation> aggregations, List<Sibling
7878
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
7979
* become part of the list of {@link InternalAggregation}s.
8080
*/
81-
List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
81+
public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
8282
return topLevelPipelineAggregators;
8383
}
8484

@@ -92,20 +92,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
9292
if (aggregationsList.isEmpty()) {
9393
return null;
9494
}
95-
InternalAggregations first = aggregationsList.get(0);
96-
return reduce(aggregationsList, first.topLevelPipelineAggregators, context);
97-
}
98-
99-
/**
100-
* Reduces the given list of aggregations as well as the provided top-level pipeline aggregators.
101-
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
102-
*/
103-
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
104-
List<SiblingPipelineAggregator> topLevelPipelineAggregators,
105-
ReduceContext context) {
106-
if (aggregationsList.isEmpty()) {
107-
return null;
108-
}
95+
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();
10996

11097
// first we collect all aggregations of the same type and list them together
11198
Map<String, List<InternalAggregation>> aggByName = new HashMap<>();

server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.search.SearchPhaseResult;
3030
import org.elasticsearch.search.SearchShardTarget;
3131
import org.elasticsearch.search.aggregations.Aggregations;
32+
import org.elasticsearch.search.aggregations.InternalAggregation;
3233
import org.elasticsearch.search.aggregations.InternalAggregations;
3334
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
3435
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
@@ -38,7 +39,6 @@
3839
import java.io.IOException;
3940
import java.util.Collections;
4041
import java.util.List;
41-
import java.util.Objects;
4242
import java.util.stream.Collectors;
4343

4444
import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
@@ -55,7 +55,6 @@ public final class QuerySearchResult extends SearchPhaseResult {
5555
private DocValueFormat[] sortValueFormats;
5656
private InternalAggregations aggregations;
5757
private boolean hasAggs;
58-
private List<SiblingPipelineAggregator> pipelineAggregators = Collections.emptyList();
5958
private Suggest suggest;
6059
private boolean searchTimedOut;
6160
private Boolean terminatedEarly = null;
@@ -199,14 +198,6 @@ public void profileResults(ProfileShardResult shardResults) {
199198
hasProfileResults = shardResults != null;
200199
}
201200

202-
public List<SiblingPipelineAggregator> pipelineAggregators() {
203-
return pipelineAggregators;
204-
}
205-
206-
public void pipelineAggregators(List<SiblingPipelineAggregator> pipelineAggregators) {
207-
this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators);
208-
}
209-
210201
public Suggest suggest() {
211202
return suggest;
212203
}
@@ -295,8 +286,18 @@ public void readFromWithId(long id, StreamInput in) throws IOException {
295286
if (hasAggs = in.readBoolean()) {
296287
aggregations = InternalAggregations.readAggregations(in);
297288
}
298-
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
299-
.collect(Collectors.toList());
289+
if (in.getVersion().before(Version.V_7_1_0)) {
290+
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
291+
.map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList());
292+
if (hasAggs && pipelineAggregators.isEmpty() == false) {
293+
List<InternalAggregation> internalAggs = aggregations.asList().stream()
294+
.map(agg -> (InternalAggregation) agg).collect(Collectors.toList());
295+
//Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while
296+
//later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
297+
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1.
298+
this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators);
299+
}
300+
}
300301
if (in.readBoolean()) {
301302
suggest = new Suggest(in);
302303
}
@@ -338,7 +339,16 @@ public void writeToNoId(StreamOutput out) throws IOException {
338339
out.writeBoolean(true);
339340
aggregations.writeTo(out);
340341
}
341-
out.writeNamedWriteableList(pipelineAggregators);
342+
if (out.getVersion().before(Version.V_7_1_0)) {
343+
//Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly,
344+
//while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
345+
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on.
346+
if (aggregations == null) {
347+
out.writeNamedWriteableList(Collections.emptyList());
348+
} else {
349+
out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators());
350+
}
351+
}
342352
if (suggest == null) {
343353
out.writeBoolean(false);
344354
} else {

server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,19 @@ public class InternalAggregationsTests extends ESTestCase {
5050
public void testReduceEmptyAggs() {
5151
List<InternalAggregations> aggs = Collections.emptyList();
5252
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean());
53-
assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext));
53+
assertNull(InternalAggregations.reduce(aggs, reduceContext));
5454
}
5555

5656
public void testNonFinalReduceTopLevelPipelineAggs() {
5757
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
5858
10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
59-
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms)));
6059
List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
6160
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
6261
topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create());
62+
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
63+
topLevelPipelineAggs));
6364
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false);
64-
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext);
65+
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext);
6566
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
6667
assertEquals(1, reducedAggs.aggregations.size());
6768
}
@@ -79,15 +80,15 @@ public void testFinalReduceTopLevelPipelineAggs() {
7980
Collections.singletonList(siblingPipelineAggregator));
8081
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
8182
} else {
82-
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms));
83-
List<SiblingPipelineAggregator> topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator);
84-
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext);
83+
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
84+
Collections.singletonList(siblingPipelineAggregator));
85+
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
8586
}
8687
assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
8788
assertEquals(2, reducedAggs.aggregations.size());
8889
}
8990

90-
public void testSerialization() throws Exception {
91+
public static InternalAggregations createTestInstance() throws Exception {
9192
List<InternalAggregation> aggsList = new ArrayList<>();
9293
if (randomBoolean()) {
9394
StringTermsTests stringTermsTests = new StringTermsTests();
@@ -116,7 +117,11 @@ public void testSerialization() throws Exception {
116117
topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
117118
}
118119
}
119-
InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs);
120+
return new InternalAggregations(aggsList, topLevelPipelineAggs);
121+
}
122+
123+
public void testSerialization() throws Exception {
124+
InternalAggregations aggregations = createTestInstance();
120125
writeToAndReadFrom(aggregations, 0);
121126
}
122127

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.query;
21+
22+
import org.apache.lucene.search.ScoreDoc;
23+
import org.apache.lucene.search.TopDocs;
24+
import org.apache.lucene.search.TotalHits;
25+
import org.elasticsearch.Version;
26+
import org.elasticsearch.action.OriginalIndices;
27+
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
28+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
29+
import org.elasticsearch.common.io.stream.StreamInput;
30+
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.index.shard.ShardId;
33+
import org.elasticsearch.search.DocValueFormat;
34+
import org.elasticsearch.search.SearchModule;
35+
import org.elasticsearch.search.SearchShardTarget;
36+
import org.elasticsearch.search.aggregations.Aggregations;
37+
import org.elasticsearch.search.aggregations.InternalAggregations;
38+
import org.elasticsearch.search.aggregations.InternalAggregationsTests;
39+
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
40+
import org.elasticsearch.search.suggest.SuggestTests;
41+
import org.elasticsearch.test.ESTestCase;
42+
import org.elasticsearch.test.VersionUtils;
43+
44+
import java.io.IOException;
45+
import java.util.Base64;
46+
import java.util.List;
47+
48+
import static java.util.Collections.emptyList;
49+
50+
public class QuerySearchResultTests extends ESTestCase {
51+
52+
private final NamedWriteableRegistry namedWriteableRegistry;
53+
54+
public QuerySearchResultTests() {
55+
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
56+
this.namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
57+
}
58+
59+
private static QuerySearchResult createTestInstance() throws Exception {
60+
ShardId shardId = new ShardId("index", "uuid", randomInt());
61+
QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE));
62+
if (randomBoolean()) {
63+
result.terminatedEarly(randomBoolean());
64+
}
65+
TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]);
66+
result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]);
67+
result.size(randomInt());
68+
result.from(randomInt());
69+
if (randomBoolean()) {
70+
result.suggest(SuggestTests.createTestItem());
71+
}
72+
if (randomBoolean()) {
73+
result.aggregations(InternalAggregationsTests.createTestInstance());
74+
}
75+
return result;
76+
}
77+
78+
public void testSerialization() throws Exception {
79+
QuerySearchResult querySearchResult = createTestInstance();
80+
Version version = VersionUtils.randomVersion(random());
81+
QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
82+
assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
83+
assertNull(deserialized.getSearchShardTarget());
84+
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
85+
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
86+
assertEquals(querySearchResult.from(), deserialized.from());
87+
assertEquals(querySearchResult.size(), deserialized.size());
88+
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
89+
if (deserialized.hasAggs()) {
90+
Aggregations aggs = querySearchResult.consumeAggs();
91+
Aggregations deserializedAggs = deserialized.consumeAggs();
92+
assertEquals(aggs.asList(), deserializedAggs.asList());
93+
List<SiblingPipelineAggregator> pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators();
94+
List<SiblingPipelineAggregator> deserializedPipelineAggs =
95+
((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators();
96+
assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size());
97+
for (int i = 0; i < pipelineAggs.size(); i++) {
98+
SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i);
99+
SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i);
100+
assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths());
101+
assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name());
102+
}
103+
}
104+
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
105+
}
106+
107+
public void testReadFromPre_7_1_0() throws IOException {
108+
String message = "AAAAAAAAAGQAAAEAAAB/wAAAAAEBBnN0ZXJtcwVJblhNRgoDBVNhdWpvAAVrS3l3cwVHSVVZaAAFZXRUbEUFZGN0WVoABXhzYnVrAAEDAfoN" +
109+
"A3JhdwUBAAJRAAAAAAAAA30DBnN0ZXJtcwVNdVVFRwoAAAEDAfoNA3JhdwUBAAdDAAAAAAAAA30AAApQVkFhaUxSdHh5TAAAAAAAAAN9AAAKTVRUeUxnd1hyd" +
110+
"y0AAAAAAAADfQAACnZRQXZ3cWp0SmwPAAAAAAAAA30AAApmYXNyUUhNVWZBCwAAAAAAAAN9AAAKT3FIQ2RMZ1JZUwUAAAAAAAADfQAACm9jT05aZmZ4ZmUmAA" +
111+
"AAAAAAA30AAApvb0tJTkdvbHdzBnN0ZXJtcwVtRmlmZAoAAAEDAfoNA3JhdwUBAARXAAAAAAAAA30AAApZd3BwQlpBZEhpMQAAAAAAAAN9AAAKREZ3UVpTSXh" +
112+
"DSE4AAAAAAAADfQAAClVMZW1YZGtkSHUUAAAAAAAAA30AAApBUVdKVk1kTlF1BnN0ZXJtcwVxbkJGVgoAAAEDAfoNA3JhdwUBAAYJAAAAAAAAA30AAApBS2NL" +
113+
"U1ZVS25EIQAAAAAAAAN9AAAKWGpCbXZBZmduRhsAAAAAAAADfQAACk54TkJEV3pLRmI7AAAAAAAAA30AAApydkdaZnJycXhWSAAAAAAAAAN9AAAKSURVZ3JhQ" +
114+
"lFHSy4AAAAAAAADfQAACmJmZ0x5YlFlVksAClRJZHJlSkpVc1Y4AAAAAAAAA30DBnN0ZXJtcwVNdVVFRwoAAAEDAfoNA3JhdwUBAAdDAAAAAAAAA30AAApQVk" +
115+
"FhaUxSdHh5TAAAAAAAAAN9AAAKTVRUeUxnd1hydy0AAAAAAAADfQAACnZRQXZ3cWp0SmwPAAAAAAAAA30AAApmYXNyUUhNVWZBCwAAAAAAAAN9AAAKT3FIQ2R" +
116+
"MZ1JZUwUAAAAAAAADfQAACm9jT05aZmZ4ZmUmAAAAAAAAA30AAApvb0tJTkdvbHdzBnN0ZXJtcwVtRmlmZAoAAAEDAfoNA3JhdwUBAARXAAAAAAAAA30AAApZ" +
117+
"d3BwQlpBZEhpMQAAAAAAAAN9AAAKREZ3UVpTSXhDSE4AAAAAAAADfQAAClVMZW1YZGtkSHUUAAAAAAAAA30AAApBUVdKVk1kTlF1BnN0ZXJtcwVxbkJGVgoAA" +
118+
"AEDAfoNA3JhdwUBAAYJAAAAAAAAA30AAApBS2NLU1ZVS25EIQAAAAAAAAN9AAAKWGpCbXZBZmduRhsAAAAAAAADfQAACk54TkJEV3pLRmI7AAAAAAAAA30AAA" +
119+
"pydkdaZnJycXhWSAAAAAAAAAN9AAAKSURVZ3JhQlFHSy4AAAAAAAADfQAACmJmZ0x5YlFlVksACm5rdExLUHp3cGgBCm1heF9idWNrZXQFbmFtZTEBB2J1Y2t" +
120+
"ldDH/A3JhdwEBCm1heF9idWNrZXQFbmFtZTEBB2J1Y2tldDH/A3JhdwEAAAIAAf////8AAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
121+
byte[] bytes = Base64.getDecoder().decode(message);
122+
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry)) {
123+
in.setVersion(Version.V_7_0_0);
124+
QuerySearchResult querySearchResult = new QuerySearchResult();
125+
querySearchResult.readFrom(in);
126+
assertEquals(100, querySearchResult.getRequestId());
127+
assertTrue(querySearchResult.hasAggs());
128+
InternalAggregations aggs = (InternalAggregations)querySearchResult.consumeAggs();
129+
assertEquals(1, aggs.asList().size());
130+
//top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately
131+
assertEquals(1, aggs.getTopLevelPipelineAggregators().size());
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)