Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/40319" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
InternalAggregations.reduce(aggregationsList, reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public void execute(SearchContext context) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}
context.queryResult().aggregations(new InternalAggregations(aggregations));
List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
Expand All @@ -144,7 +143,7 @@ public void execute(SearchContext context) {
+ "allowed at the top level");
}
}
context.queryResult().pipelineAggregators(siblingPipelineAggregators);
context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public InternalAggregations(List<InternalAggregation> aggregations, List<Sibling
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
* become part of the list of {@link InternalAggregation}s.
*/
List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
return topLevelPipelineAggregators;
}

Expand All @@ -91,20 +91,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
if (aggregationsList.isEmpty()) {
return null;
}
InternalAggregations first = aggregationsList.get(0);
return reduce(aggregationsList, first.topLevelPipelineAggregators, context);
}

/**
* Reduces the given list of aggregations as well as the provided top-level pipeline aggregators.
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> topLevelPipelineAggregators,
ReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();

// first we collect all aggregations of the same type and list them together
Map<String, List<InternalAggregation>> aggByName = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
Expand All @@ -37,7 +39,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
Expand All @@ -54,7 +55,6 @@ public final class QuerySearchResult extends SearchPhaseResult {
private DocValueFormat[] sortValueFormats;
private InternalAggregations aggregations;
private boolean hasAggs;
private List<SiblingPipelineAggregator> pipelineAggregators = Collections.emptyList();
private Suggest suggest;
private boolean searchTimedOut;
private Boolean terminatedEarly = null;
Expand Down Expand Up @@ -198,14 +198,6 @@ public void profileResults(ProfileShardResult shardResults) {
hasProfileResults = shardResults != null;
}

public List<SiblingPipelineAggregator> pipelineAggregators() {
return pipelineAggregators;
}

public void pipelineAggregators(List<SiblingPipelineAggregator> pipelineAggregators) {
this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators);
}

public Suggest suggest() {
return suggest;
}
Expand Down Expand Up @@ -294,8 +286,18 @@ public void readFromWithId(long id, StreamInput in) throws IOException {
if (hasAggs = in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in);
}
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
.collect(Collectors.toList());
if (in.getVersion().before(Version.V_7_1_0)) {
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
.map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we skip the rewriting if siblings are empty ?

if (hasAggs && pipelineAggregators.isEmpty() == false) {
List<InternalAggregation> internalAggs = aggregations.asList().stream()
.map(agg -> (InternalAggregation) agg).collect(Collectors.toList());
//Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while
//later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1.
this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators);
}
}
if (in.readBoolean()) {
suggest = new Suggest(in);
}
Expand Down Expand Up @@ -332,7 +334,16 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeBoolean(true);
aggregations.writeTo(out);
}
out.writeNamedWriteableList(pipelineAggregators);
if (out.getVersion().before(Version.V_7_1_0)) {
//Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly,
//while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on.
if (aggregations == null) {
out.writeNamedWriteableList(Collections.emptyList());
} else {
out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators());
}
}
if (suggest == null) {
out.writeBoolean(false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ public class InternalAggregationsTests extends ESTestCase {
public void testReduceEmptyAggs() {
List<InternalAggregations> aggs = Collections.emptyList();
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean());
assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext));
assertNull(InternalAggregations.reduce(aggs, reduceContext));
}

public void testNonFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms)));
List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create());
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
topLevelPipelineAggs));
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false);
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext);
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext);
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(1, reducedAggs.aggregations.size());
}
Expand All @@ -79,15 +80,15 @@ public void testFinalReduceTopLevelPipelineAggs() {
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
} else {
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms));
List<SiblingPipelineAggregator> topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator);
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
}
assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(2, reducedAggs.aggregations.size());
}

public void testSerialization() throws Exception {
public static InternalAggregations createTestInstance() throws Exception {
List<InternalAggregation> aggsList = new ArrayList<>();
if (randomBoolean()) {
StringTermsTests stringTermsTests = new StringTermsTests();
Expand Down Expand Up @@ -116,7 +117,11 @@ public void testSerialization() throws Exception {
topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
}
}
InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs);
return new InternalAggregations(aggsList, topLevelPipelineAggs);
}

public void testSerialization() throws Exception {
InternalAggregations aggregations = createTestInstance();
writeToAndReadFrom(aggregations, 0);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.query;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalAggregationsTests;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.suggest.SuggestTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;

import java.util.List;

import static java.util.Collections.emptyList;

public class QuerySearchResultTests extends ESTestCase {

private final NamedWriteableRegistry namedWriteableRegistry;

public QuerySearchResultTests() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
this.namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
}

private static QuerySearchResult createTestInstance() throws Exception {
ShardId shardId = new ShardId("index", "uuid", randomInt());
QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE));
if (randomBoolean()) {
result.terminatedEarly(randomBoolean());
}
TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]);
result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]);
result.size(randomInt());
result.from(randomInt());
if (randomBoolean()) {
result.suggest(SuggestTests.createTestItem());
}
if (randomBoolean()) {
result.aggregations(InternalAggregationsTests.createTestInstance());
}
return result;
}

public void testSerialization() throws Exception {
QuerySearchResult querySearchResult = createTestInstance();
Version version = VersionUtils.randomVersion(random());
QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
assertEquals(querySearchResult.from(), deserialized.from());
assertEquals(querySearchResult.size(), deserialized.size());
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
if (deserialized.hasAggs()) {
Aggregations aggs = querySearchResult.consumeAggs();
Aggregations deserializedAggs = deserialized.consumeAggs();
assertEquals(aggs.asList(), deserializedAggs.asList());
List<SiblingPipelineAggregator> pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators();
List<SiblingPipelineAggregator> deserializedPipelineAggs =
((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators();
assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size());
for (int i = 0; i < pipelineAggs.size(); i++) {
SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i);
SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i);
assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths());
assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name());
}
}
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
}
}