From 295ee9f59995559924ae849efd4c6c94300c47e7 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 15 Mar 2019 12:22:04 +0100 Subject: [PATCH 1/5] Serialize top-level pipeline aggs as part of InternalAggregations We currently convert pipeline aggregators to their corresponding InternalAggregation instance as part of the final reduction phase. They arrive to the coordinating node as part of QuerySearchResult objects fom the shards and, despite we may incrementally reduce aggs (hence we may have some non-final reduce and the final one later) all the reduction phases happen on the same node. With CCS minimizing roundtrips though, each cluster performs its own non-final reduction, and then serializes the results back to the CCS coordinating node which will perform the final coordination. This breaks the assumptions made up until now around reductions happening all on the same node. With #40101 we have made sure that top-level pipeline aggs are not reduced as part of the non-final reduction. The next step is to make sure that they don't get lost, meaning that each coordinating node needs to send them back to the CCS coordinating node as part of the top-level `InternalAggregations` object. Closes #40059 --- .../aggregations/InternalAggregations.java | 69 ++++++-- .../InternalAggregationsTests.java | 160 ++++++++++++++++++ 2 files changed, 216 insertions(+), 13 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 69adb79cb2b84..fe3e2105a8f8f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -18,10 +18,12 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import java.io.IOException; @@ -49,6 +51,8 @@ public final class InternalAggregations extends Aggregations implements Streamab } }; + private List topLevelPipelineAggregators; + private InternalAggregations() { } @@ -60,18 +64,42 @@ public InternalAggregations(List aggregations) { } /** - * Reduces the given list of aggregations + * Constructs a new aggregation providing its {@link InternalAggregation}s and {@link SiblingPipelineAggregator}s + */ + public InternalAggregations(List aggregations, List topLevelPipelineAggregators) { + super(aggregations); + this.topLevelPipelineAggregators = topLevelPipelineAggregators; + } + + /** + * Returns the top-level pipeline aggregators. + * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they + * become part of the list of {@link InternalAggregation}s. */ - public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { - return reduce(aggregationsList, null, context); + List getTopLevelPipelineAggregators() { + return topLevelPipelineAggregators; } /** - * Reduces the given list of aggregations as well as the provided sibling pipeline aggregators. - * Note that sibling pipeline aggregators are ignored when non final reduction is performed. + * Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first + * {@link InternalAggregations} object found in the list. + * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. */ public static InternalAggregations reduce(List aggregationsList, - List siblingPipelineAggregators, + ReduceContext context) { + if (aggregationsList.isEmpty()) { + return null; + } + InternalAggregations first = aggregationsList.get(0); + return reduce(aggregationsList, first.topLevelPipelineAggregators, context); + } + + /** + * Reduces the given list of aggregations as well as the provided top-level pipeline aggregators. + * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. + */ + public static InternalAggregations reduce(List aggregationsList, + List topLevelPipelineAggregators, ReduceContext context) { if (aggregationsList.isEmpty()) { return null; @@ -98,15 +126,14 @@ public static InternalAggregations reduce(List aggregation reducedAggregations.add(first.reduce(aggregations, context)); } - if (siblingPipelineAggregators != null) { - if (context.isFinalReduce()) { - for (SiblingPipelineAggregator pipelineAggregator : siblingPipelineAggregators) { - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); - reducedAggregations.add(newAgg); - } + if (topLevelPipelineAggregators != null && context.isFinalReduce()) { + for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); + reducedAggregations.add(newAgg); } + return new InternalAggregations(reducedAggregations); } - return new InternalAggregations(reducedAggregations); + return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators); } public static InternalAggregations readAggregations(StreamInput in) throws IOException { @@ -121,11 +148,27 @@ public void readFrom(StreamInput in) throws IOException { if (aggregations.isEmpty()) { aggregationsAsMap = emptyMap(); } + //TODO update version after backport + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + if (in.readBoolean()) { + this.topLevelPipelineAggregators = in.readList( + stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); + } + } } @Override @SuppressWarnings("unchecked") public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteableList((List)aggregations); + //TODO update version after backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (topLevelPipelineAggregators == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeNamedWriteableList(topLevelPipelineAggregators); + } + } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java new file mode 100644 index 0000000000000..433fcd895821d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -0,0 +1,160 @@ +package org.elasticsearch.search.aggregations; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.StringTermsTests; +import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests; +import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.List; + +import static java.util.Collections.emptyList; + +public class InternalAggregationsTests extends ESTestCase { + + private final NamedWriteableRegistry registry = new NamedWriteableRegistry( + new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables()); + + public void testReduceEmptyAggs() { + List aggs = Collections.emptyList(); + List topLevelPipelineAggs = randomBoolean() ? Collections.emptyList() : null; + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean()); + assertNull(InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext)); + } + + public void testNonFinalReduceTopLevelPipelineAggs() throws IOException { + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); + List aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms))); + List topLevelPipelineAggs = new ArrayList<>(); + MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); + topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create()); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); + InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext); + assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); + assertEquals(1, reducedAggs.aggregations.size()); + } + + public void testFinalReduceTopLevelPipelineAggs() throws IOException { + InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), + 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); + + MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); + SiblingPipelineAggregator siblingPipelineAggregator = (SiblingPipelineAggregator) maxBucketPipelineAggregationBuilder.create(); + InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, true); + final InternalAggregations reducedAggs; + if (randomBoolean()) { + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), + Collections.singletonList(siblingPipelineAggregator)); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); + } else { + InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); + List topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator); + reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext); + } + assertNull(reducedAggs.getTopLevelPipelineAggregators()); + assertEquals(2, reducedAggs.aggregations.size()); + } + + public void testSerialization() throws Exception { + List aggsList = new ArrayList<>(); + if (randomBoolean()) { + StringTermsTests stringTermsTests = new StringTermsTests(); + stringTermsTests.init(); + stringTermsTests.setUp(); + aggsList.add(stringTermsTests.createTestInstance()); + } + if (randomBoolean()) { + InternalDateHistogramTests dateHistogramTests = new InternalDateHistogramTests(); + dateHistogramTests.setUp(); + aggsList.add(dateHistogramTests.createTestInstance()); + } + if (randomBoolean()) { + InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); + aggsList.add(simpleValueTests.createTestInstance()); + } + List topLevelPipelineAggs = null; + if (randomBoolean()) { + topLevelPipelineAggs = new ArrayList<>(); + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); + } + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); + } + if (randomBoolean()) { + topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); + } + } + + Version version = VersionUtils.randomVersion(random()); + + InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + aggregations.writeTo(out); + try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), registry)) { + in.setVersion(version); + InternalAggregations deserialized = InternalAggregations.readAggregations(in); + assertEquals(aggregations.aggregations, deserialized.aggregations); + if (aggregations.getTopLevelPipelineAggregators() == null) { + assertNull(deserialized.getTopLevelPipelineAggregators()); + } else { + //TODO update version after backport + if (version.onOrAfter(Version.V_8_0_0)) { + assertNotNull(deserialized.getTopLevelPipelineAggregators()); + assertEquals(aggregations.getTopLevelPipelineAggregators().size(), deserialized.getTopLevelPipelineAggregators().size()); + for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) { + SiblingPipelineAggregator siblingPipelineAggregator1 = aggregations.getTopLevelPipelineAggregators().get(i); + SiblingPipelineAggregator siblingPipelineAggregator2 = deserialized.getTopLevelPipelineAggregators().get(i); + assertArrayEquals(siblingPipelineAggregator1.bucketsPaths(), siblingPipelineAggregator2.bucketsPaths()); + assertEquals(siblingPipelineAggregator1.name(), siblingPipelineAggregator2.name()); + } + } else { + assertNull(deserialized.getTopLevelPipelineAggregators()); + } + } + } + } + } + + //TODO update version and rename after backport + public void testSerializationFromPre_8_0_0() throws IOException { + String aggsString = "AwZzdGVybXMFb0F0Q0EKCQVsZG5ncgAFeG56RWcFeUFxVmcABXBhQVVpBUtYc2VIAAVaclRESwVqUkxySAAFelp5d1AFRUREcEYABW1" + + "sckF0BU5wWWVFAAVJYVJmZgVURlJVbgAFT0RiU04FUWNwSVoABU1sb09HBUNzZHFlAAVWWmJHaQABAwGIDgNyYXcFAQAADmRhdGVfaGlzdG9ncmFt" + + "BVhHbVl4/wADAAKAurcDA1VUQwABAQAAAWmOhukAAQAAAWmR9dEAAAAAAAAAAAAAAANyYXcACAAAAWmQrDoAUQAAAAFpkRoXAEMAAAABaZGH9AAtA" + + "AAAAWmR9dEAJwAAAAFpkmOuAFwAAAABaZLRiwAYAAAAAWmTP2gAKgAAAAFpk61FABsADHNpbXBsZV92YWx1ZQVsWVNLVv8AB2RlY2ltYWwGIyMjLi" + + "MjQLZWZVy5zBYAAAAAAAAAAAAAAAAAAAAAAAAA"; + + byte[] aggsBytes = Base64.getDecoder().decode(aggsString); + try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(aggsBytes), registry)) { + in.setVersion(VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), + Version.max(Version.CURRENT.minimumCompatibilityVersion(), VersionUtils.getPreviousVersion(Version.CURRENT)))); + InternalAggregations deserialized = InternalAggregations.readAggregations(in); + assertEquals(3, deserialized.aggregations.size()); + assertThat(deserialized.aggregations.get(0), Matchers.instanceOf(StringTerms.class)); + assertThat(deserialized.aggregations.get(1), Matchers.instanceOf(InternalDateHistogram.class)); + assertThat(deserialized.aggregations.get(2), Matchers.instanceOf(InternalSimpleValue.class)); + } + } +} From 876c6ca0a6ce0a1fab5931f0f078a9a9bbd3dc22 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 18 Mar 2019 21:50:12 +0100 Subject: [PATCH 2/5] line length --- .../search/aggregations/InternalAggregationsTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index 433fcd895821d..a6d6e3e01d527 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -123,7 +123,8 @@ public void testSerialization() throws Exception { //TODO update version after backport if (version.onOrAfter(Version.V_8_0_0)) { assertNotNull(deserialized.getTopLevelPipelineAggregators()); - assertEquals(aggregations.getTopLevelPipelineAggregators().size(), deserialized.getTopLevelPipelineAggregators().size()); + assertEquals(aggregations.getTopLevelPipelineAggregators().size(), + deserialized.getTopLevelPipelineAggregators().size()); for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) { SiblingPipelineAggregator siblingPipelineAggregator1 = aggregations.getTopLevelPipelineAggregators().get(i); SiblingPipelineAggregator siblingPipelineAggregator2 = deserialized.getTopLevelPipelineAggregators().get(i); From f1fe3a8433fc31444e25d8e1a73ca9ddf25083d1 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 18 Mar 2019 22:15:16 +0100 Subject: [PATCH 3/5] license header --- .../InternalAggregationsTests.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index a6d6e3e01d527..a30c472ac46b3 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.elasticsearch.search.aggregations; import org.elasticsearch.Version; From 88c47281ca098d47fc8028e954f2137296fdf9f0 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 18 Mar 2019 23:02:28 +0100 Subject: [PATCH 4/5] address comments --- .../aggregations/InternalAggregations.java | 23 ++++++-------- .../InternalAggregationsTests.java | 31 ++++++++++--------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index fe3e2105a8f8f..28742df6c093e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -28,10 +28,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import static java.util.Collections.emptyMap; @@ -51,7 +53,7 @@ public final class InternalAggregations extends Aggregations implements Streamab } }; - private List topLevelPipelineAggregators; + private List topLevelPipelineAggregators = Collections.emptyList(); private InternalAggregations() { } @@ -68,7 +70,7 @@ public InternalAggregations(List aggregations) { */ public InternalAggregations(List aggregations, List topLevelPipelineAggregators) { super(aggregations); - this.topLevelPipelineAggregators = topLevelPipelineAggregators; + this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators); } /** @@ -126,7 +128,7 @@ public static InternalAggregations reduce(List aggregation reducedAggregations.add(first.reduce(aggregations, context)); } - if (topLevelPipelineAggregators != null && context.isFinalReduce()) { + if (context.isFinalReduce()) { for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) { InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context); reducedAggregations.add(newAgg); @@ -150,10 +152,10 @@ public void readFrom(StreamInput in) throws IOException { } //TODO update version after backport if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - if (in.readBoolean()) { - this.topLevelPipelineAggregators = in.readList( - stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); - } + this.topLevelPipelineAggregators = in.readList( + stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); + } else { + this.topLevelPipelineAggregators = Collections.emptyList(); } } @@ -163,12 +165,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteableList((List)aggregations); //TODO update version after backport if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - if (topLevelPipelineAggregators == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeNamedWriteableList(topLevelPipelineAggregators); - } + out.writeNamedWriteableList(topLevelPipelineAggregators); } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index a30c472ac46b3..00c8379b8b2bc 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -46,18 +46,15 @@ import java.util.Collections; import java.util.List; -import static java.util.Collections.emptyList; - public class InternalAggregationsTests extends ESTestCase { private final NamedWriteableRegistry registry = new NamedWriteableRegistry( - new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables()); + new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedWriteables()); public void testReduceEmptyAggs() { List aggs = Collections.emptyList(); - List topLevelPipelineAggs = randomBoolean() ? Collections.emptyList() : null; InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean()); - assertNull(InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext)); + assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext)); } public void testNonFinalReduceTopLevelPipelineAggs() throws IOException { @@ -90,7 +87,7 @@ public void testFinalReduceTopLevelPipelineAggs() throws IOException { List topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator); reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext); } - assertNull(reducedAggs.getTopLevelPipelineAggregators()); + assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); assertEquals(2, reducedAggs.aggregations.size()); } @@ -111,9 +108,8 @@ public void testSerialization() throws Exception { InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); aggsList.add(simpleValueTests.createTestInstance()); } - List topLevelPipelineAggs = null; + List topLevelPipelineAggs = new ArrayList<>(); if (randomBoolean()) { - topLevelPipelineAggs = new ArrayList<>(); if (randomBoolean()) { topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); } @@ -124,10 +120,12 @@ public void testSerialization() throws Exception { topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); } } + InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); + writeToAndReadFrom(aggregations, 0); + } + private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { Version version = VersionUtils.randomVersion(random()); - - InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); try (BytesStreamOutput out = new BytesStreamOutput()) { out.setVersion(version); aggregations.writeTo(out); @@ -136,11 +134,12 @@ public void testSerialization() throws Exception { InternalAggregations deserialized = InternalAggregations.readAggregations(in); assertEquals(aggregations.aggregations, deserialized.aggregations); if (aggregations.getTopLevelPipelineAggregators() == null) { - assertNull(deserialized.getTopLevelPipelineAggregators()); + assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); } else { //TODO update version after backport - if (version.onOrAfter(Version.V_8_0_0)) { - assertNotNull(deserialized.getTopLevelPipelineAggregators()); + if (version.before(Version.V_8_0_0)) { + assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); + } else { assertEquals(aggregations.getTopLevelPipelineAggregators().size(), deserialized.getTopLevelPipelineAggregators().size()); for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) { @@ -149,10 +148,12 @@ public void testSerialization() throws Exception { assertArrayEquals(siblingPipelineAggregator1.bucketsPaths(), siblingPipelineAggregator2.bucketsPaths()); assertEquals(siblingPipelineAggregator1.name(), siblingPipelineAggregator2.name()); } - } else { - assertNull(deserialized.getTopLevelPipelineAggregators()); } } + if (iteration < 2) { + //serialize this enough times to make sure that we are able to write again what we read + writeToAndReadFrom(deserialized, iteration + 1); + } } } } From 5cbfedcc71c8b477a2abbaa01f557daf6b38646e Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 18 Mar 2019 23:43:26 +0100 Subject: [PATCH 5/5] address test failures --- .../elasticsearch/search/query/QuerySearchResult.java | 10 +++++----- .../action/search/TransportSearchActionTests.java | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 3e5f1f65692fa..34d3508f6bab5 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -35,10 +35,11 @@ import org.elasticsearch.search.suggest.Suggest; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; -import static java.util.Collections.emptyList; import static org.elasticsearch.common.lucene.Lucene.readTopDocs; import static org.elasticsearch.common.lucene.Lucene.writeTopDocs; @@ -53,7 +54,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private DocValueFormat[] sortValueFormats; private InternalAggregations aggregations; private boolean hasAggs; - private List pipelineAggregators; + private List pipelineAggregators = Collections.emptyList(); private Suggest suggest; private boolean searchTimedOut; private Boolean terminatedEarly = null; @@ -79,7 +80,6 @@ public QuerySearchResult queryResult() { return this; } - public void searchTimedOut(boolean searchTimedOut) { this.searchTimedOut = searchTimedOut; } @@ -203,7 +203,7 @@ public List pipelineAggregators() { } public void pipelineAggregators(List pipelineAggregators) { - this.pipelineAggregators = pipelineAggregators; + this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators); } public Suggest suggest() { @@ -332,7 +332,7 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); aggregations.writeTo(out); } - out.writeNamedWriteableList(pipelineAggregators == null ? emptyList() : pipelineAggregators); + out.writeNamedWriteableList(pipelineAggregators); if (suggest == null) { out.writeBoolean(false); } else { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index cfcea61a160da..369d71f05ffb8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -419,7 +419,8 @@ public void testCCSRemoteReduce() throws Exception { OriginalIndices localIndices = local ? new OriginalIndices(new String[]{"index"}, SearchRequest.DEFAULT_INDICES_OPTIONS) : null; int totalClusters = numClusters + (local ? 1 : 0); TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0); - Function reduceContext = finalReduce -> null; + Function reduceContext = + finalReduce -> new InternalAggregation.ReduceContext(null, null, finalReduce); try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests();