diff --git a/build.gradle b/build.gradle index 80ed642369ace..d452170b5fb90 100644 --- a/build.gradle +++ b/build.gradle @@ -162,8 +162,8 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ -boolean bwc_tests_enabled = true -final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ +boolean bwc_tests_enabled = false +final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/40319" /* place a PR link here when committing bwc changes */ if (bwc_tests_enabled == false) { if (bwc_tests_disabled_issue.isEmpty()) { throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false") diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index f54f101041d1b..0125084c37099 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -486,7 +486,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection pipelineAggregators = context.aggregations().factories().createPipelineAggregators(); List siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size()); for (PipelineAggregator pipelineAggregator : pipelineAggregators) { @@ -144,7 +143,7 @@ public void execute(SearchContext context) { + "allowed at the top level"); } } - context.queryResult().pipelineAggregators(siblingPipelineAggregators); + context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators)); // disable aggregations so that they don't run on next pages in case of scrolling context.aggregations(null); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 187f5e3864ed1..8910ca25c337d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -77,7 +77,7 @@ public InternalAggregations(List aggregations, List getTopLevelPipelineAggregators() { + public List getTopLevelPipelineAggregators() { return topLevelPipelineAggregators; } @@ -91,20 +91,7 @@ public static InternalAggregations reduce(List aggregation if (aggregationsList.isEmpty()) { return null; } - InternalAggregations first = aggregationsList.get(0); - return reduce(aggregationsList, first.topLevelPipelineAggregators, context); - } - - /** - * Reduces the given list of aggregations as well as the provided top-level pipeline aggregators. - * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. - */ - public static InternalAggregations reduce(List aggregationsList, - List topLevelPipelineAggregators, - ReduceContext context) { - if (aggregationsList.isEmpty()) { - return null; - } + List topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators(); // first we collect all aggregations of the same type and list them together Map> aggByName = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 34d3508f6bab5..9f9a2c2680a1f 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -21,6 +21,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; @@ -28,6 +29,7 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; @@ -37,7 +39,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; @@ -54,7 +55,6 @@ public final class QuerySearchResult extends SearchPhaseResult { private DocValueFormat[] sortValueFormats; private InternalAggregations aggregations; private boolean hasAggs; - private List pipelineAggregators = Collections.emptyList(); private Suggest suggest; private boolean searchTimedOut; private Boolean terminatedEarly = null; @@ -198,14 +198,6 @@ public void profileResults(ProfileShardResult shardResults) { hasProfileResults = shardResults != null; } - public List pipelineAggregators() { - return pipelineAggregators; - } - - public void pipelineAggregators(List pipelineAggregators) { - this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators); - } - public Suggest suggest() { return suggest; } @@ -294,8 +286,18 @@ public void readFromWithId(long id, StreamInput in) throws IOException { if (hasAggs = in.readBoolean()) { aggregations = InternalAggregations.readAggregations(in); } - pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a) - .collect(Collectors.toList()); + if (in.getVersion().before(Version.V_7_1_0)) { + List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() + .map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList()); + if (hasAggs && pipelineAggregators.isEmpty() == false) { + List internalAggs = aggregations.asList().stream() + .map(agg -> (InternalAggregation) agg).collect(Collectors.toList()); + //Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while + //later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of + //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1. + this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators); + } + } if (in.readBoolean()) { suggest = new Suggest(in); } @@ -332,7 +334,16 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); aggregations.writeTo(out); } - out.writeNamedWriteableList(pipelineAggregators); + if (out.getVersion().before(Version.V_7_1_0)) { + //Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, + //while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of + //InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on. + if (aggregations == null) { + out.writeNamedWriteableList(Collections.emptyList()); + } else { + out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators()); + } + } if (suggest == null) { out.writeBoolean(false); } else { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index 81626459db4f2..aa244ff7a320b 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -50,18 +50,19 @@ public class InternalAggregationsTests extends ESTestCase { public void testReduceEmptyAggs() { List aggs = Collections.emptyList(); InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean()); - assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext)); + assertNull(InternalAggregations.reduce(aggs, reduceContext)); } public void testNonFinalReduceTopLevelPipelineAggs() { InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); - List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms))); List topLevelPipelineAggs = new ArrayList<>(); MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create()); + List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms), + topLevelPipelineAggs)); InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); - InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext); + InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext); assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(1, reducedAggs.aggregations.size()); } @@ -79,15 +80,15 @@ public void testFinalReduceTopLevelPipelineAggs() { Collections.singletonList(siblingPipelineAggregator)); reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); } else { - InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); - List topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator); - reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext); + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), + Collections.singletonList(siblingPipelineAggregator)); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); } assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); } - public void testSerialization() throws Exception { + public static InternalAggregations createTestInstance() throws Exception { List aggsList = new ArrayList<>(); if (randomBoolean()) { StringTermsTests stringTermsTests = new StringTermsTests(); @@ -116,7 +117,11 @@ public void testSerialization() throws Exception { topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); } } - InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); + return new InternalAggregations(aggsList, topLevelPipelineAggs); + } + + public void testSerialization() throws Exception { + InternalAggregations aggregations = createTestInstance(); writeToAndReadFrom(aggregations, 0); } diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java new file mode 100644 index 0000000000000..64712b3e417a0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.query; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.Version; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalAggregationsTests; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.suggest.SuggestTests; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import java.util.List; + +import static java.util.Collections.emptyList; + +public class QuerySearchResultTests extends ESTestCase { + + private final NamedWriteableRegistry namedWriteableRegistry; + + public QuerySearchResultTests() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); + this.namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + } + + private static QuerySearchResult createTestInstance() throws Exception { + ShardId shardId = new ShardId("index", "uuid", randomInt()); + QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE)); + if (randomBoolean()) { + result.terminatedEarly(randomBoolean()); + } + TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]); + result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]); + result.size(randomInt()); + result.from(randomInt()); + if (randomBoolean()) { + result.suggest(SuggestTests.createTestItem()); + } + if (randomBoolean()) { + result.aggregations(InternalAggregationsTests.createTestInstance()); + } + return result; + } + + public void testSerialization() throws Exception { + QuerySearchResult querySearchResult = createTestInstance(); + Version version = VersionUtils.randomVersion(random()); + QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version); + assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId()); + assertNull(deserialized.getSearchShardTarget()); + assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); + assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits); + assertEquals(querySearchResult.from(), deserialized.from()); + assertEquals(querySearchResult.size(), deserialized.size()); + assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); + if (deserialized.hasAggs()) { + Aggregations aggs = querySearchResult.consumeAggs(); + Aggregations deserializedAggs = deserialized.consumeAggs(); + assertEquals(aggs.asList(), deserializedAggs.asList()); + List pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators(); + List deserializedPipelineAggs = + ((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators(); + assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size()); + for (int i = 0; i < pipelineAggs.size(); i++) { + SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i); + SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i); + assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths()); + assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name()); + } + } + assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); + } +}