From 5867705a41ba174956ffa20437fcb6adbb0941fc Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 6 May 2019 12:16:15 +0200 Subject: [PATCH] Move InternalAggregations to Writeable Relates to #34389 --- .../search/aggregations/Aggregations.java | 11 ++-- .../aggregations/InternalAggregations.java | 54 +++++++------------ .../InternalSingleBucketAggregation.java | 2 +- .../adjacency/InternalAdjacencyMatrix.java | 2 +- .../bucket/composite/InternalComposite.java | 2 +- .../bucket/filter/InternalFilters.java | 2 +- .../bucket/geogrid/InternalGeoGridBucket.java | 2 +- .../histogram/InternalAutoDateHistogram.java | 4 +- .../histogram/InternalDateHistogram.java | 4 +- .../bucket/histogram/InternalHistogram.java | 4 +- .../bucket/range/InternalBinaryRange.java | 2 +- .../bucket/range/InternalRange.java | 2 +- .../significant/SignificantLongTerms.java | 2 +- .../significant/SignificantStringTerms.java | 2 +- .../bucket/terms/InternalTerms.java | 2 +- .../internal/InternalSearchResponse.java | 4 +- .../search/query/QuerySearchResult.java | 2 +- .../InternalAggregationsTests.java | 2 +- 18 files changed, 46 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java index af9ea84ec9de8..ae9000b4b75ff 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; +import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.xcontent.XContentParserUtils.parseTypedKeysObject; @@ -44,14 +45,14 @@ public class Aggregations implements Iterable, ToXContentFragment { public static final String AGGREGATIONS_FIELD = "aggregations"; - protected List aggregations = Collections.emptyList(); - protected Map aggregationsAsMap; - - protected Aggregations() { - } + protected final List aggregations; + private Map aggregationsAsMap; public Aggregations(List aggregations) { this.aggregations = aggregations; + if (aggregations.isEmpty()) { + aggregationsAsMap = emptyMap(); + } } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 8910ca25c337d..e1597c5c8c063 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; @@ -34,14 +34,13 @@ import java.util.Map; import java.util.Objects; -import static java.util.Collections.emptyMap; - /** * An internal implementation of {@link Aggregations}. */ -public final class InternalAggregations extends Aggregations implements Streamable { +public final class InternalAggregations extends Aggregations implements Writeable { + + public static final InternalAggregations EMPTY = new InternalAggregations(Collections.emptyList()); - public static final InternalAggregations EMPTY = new InternalAggregations(); private static final Comparator INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> { if (agg1.isMapped() == agg2.isMapped()) { return 0; @@ -52,16 +51,14 @@ public final class InternalAggregations extends Aggregations implements Streamab } }; - private List topLevelPipelineAggregators = Collections.emptyList(); - - private InternalAggregations() { - } + private final List topLevelPipelineAggregators; /** * Constructs a new aggregation. */ public InternalAggregations(List aggregations) { super(aggregations); + this.topLevelPipelineAggregators = Collections.emptyList(); } /** @@ -72,6 +69,19 @@ public InternalAggregations(List aggregations, List in.readNamedWriteable(InternalAggregation.class))); + this.topLevelPipelineAggregators = in.readList( + stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); + } + + @Override + @SuppressWarnings("unchecked") + public void writeTo(StreamOutput out) throws IOException { + out.writeNamedWriteableList((List)aggregations); + out.writeNamedWriteableList(topLevelPipelineAggregators); + } + /** * Returns the top-level pipeline aggregators. * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they @@ -86,8 +96,7 @@ public List getTopLevelPipelineAggregators() { * {@link InternalAggregations} object found in the list. * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched. */ - public static InternalAggregations reduce(List aggregationsList, - ReduceContext context) { + public static InternalAggregations reduce(List aggregationsList, ReduceContext context) { if (aggregationsList.isEmpty()) { return null; } @@ -123,27 +132,4 @@ public static InternalAggregations reduce(List aggregation } return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators); } - - public static InternalAggregations readAggregations(StreamInput in) throws IOException { - InternalAggregations result = new InternalAggregations(); - result.readFrom(in); - return result; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - aggregations = in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)); - if (aggregations.isEmpty()) { - aggregationsAsMap = emptyMap(); - } - this.topLevelPipelineAggregators = in.readList( - stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class)); - } - - @Override - @SuppressWarnings("unchecked") - public void writeTo(StreamOutput out) throws IOException { - out.writeNamedWriteableList((List)aggregations); - out.writeNamedWriteableList(topLevelPipelineAggregators); - } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 490c7a3687844..5626c8f21859d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -60,7 +60,7 @@ protected InternalSingleBucketAggregation(String name, long docCount, InternalAg protected InternalSingleBucketAggregation(StreamInput in) throws IOException { super(in); docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index 8ce6304daf8ea..c862f458939ed 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -58,7 +58,7 @@ public InternalBucket(String key, long docCount, InternalAggregations aggregatio public InternalBucket(StreamInput in) throws IOException { key = in.readOptionalString(); docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 715537597d906..e6a7edd8c217d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -237,7 +237,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern InternalBucket(StreamInput in, List sourceNames, List formats, int[] reverseMuls) throws IOException { this.key = new CompositeKey(in); this.docCount = in.readVLong(); - this.aggregations = InternalAggregations.readAggregations(in); + this.aggregations = new InternalAggregations(in); this.reverseMuls = reverseMuls; this.sourceNames = sourceNames; this.formats = formats; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java index 56cf71b82cfdd..f6ebfa459c02d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java @@ -57,7 +57,7 @@ public InternalBucket(StreamInput in, boolean keyed) throws IOException { this.keyed = keyed; key = in.readOptionalString(); docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java index ed699e5e3edb2..93002d607eaf8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java @@ -51,7 +51,7 @@ public InternalGeoGridBucket(long hashAsLong, long docCount, InternalAggregation public InternalGeoGridBucket(StreamInput in) throws IOException { hashAsLong = in.readLong(); docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index 63d08f5e832ac..dc0eefec45c82 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -73,7 +73,7 @@ public Bucket(StreamInput in, DocValueFormat format) throws IOException { this.format = format; key = in.readLong(); docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override @@ -175,7 +175,7 @@ static class BucketInfo { roundingInfos[i] = new RoundingInfo(in); } roundingIdx = in.readVInt(); - emptySubAggregations = InternalAggregations.readAggregations(in); + emptySubAggregations = new InternalAggregations(in); } void writeTo(StreamOutput out) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index c15182f97a106..162bfde5acf0f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -77,7 +77,7 @@ public Bucket(StreamInput in, boolean keyed, DocValueFormat format) throws IOExc this.keyed = keyed; key = in.readLong(); docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override @@ -186,7 +186,7 @@ static class EmptyBucketInfo { EmptyBucketInfo(StreamInput in) throws IOException { rounding = Rounding.read(in); - subAggregations = InternalAggregations.readAggregations(in); + subAggregations = new InternalAggregations(in); bounds = in.readOptionalWriteable(ExtendedBounds::new); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index dd942f5bf6be2..b324241e9bb6d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -73,7 +73,7 @@ public Bucket(StreamInput in, boolean keyed, DocValueFormat format) throws IOExc this.keyed = keyed; key = in.readDouble(); docCount = in.readVLong(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override @@ -178,7 +178,7 @@ static class EmptyBucketInfo { } EmptyBucketInfo(StreamInput in) throws IOException { - this(in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble(), InternalAggregations.readAggregations(in)); + this(in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble(), new InternalAggregations(in)); } public void writeTo(StreamOutput out) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java index 3caae6ac34505..35ee592d6c9ae 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java @@ -77,7 +77,7 @@ private static Bucket createFromStream(StreamInput in, DocValueFormat format, bo BytesRef from = in.readBoolean() ? in.readBytesRef() : null; BytesRef to = in.readBoolean() ? in.readBytesRef() : null; long docCount = in.readLong(); - InternalAggregations aggregations = InternalAggregations.readAggregations(in); + InternalAggregations aggregations = new InternalAggregations(in); return new Bucket(format, keyed, key, from, to, docCount, aggregations); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index 2427104d0c0f5..747cb22c87071 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -256,7 +256,7 @@ public InternalRange(StreamInput in) throws IOException { for (int i = 0; i < size; i++) { String key = in.readString(); ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(), - InternalAggregations.readAggregations(in), keyed, format)); + new InternalAggregations(in), keyed, format)); } this.ranges = ranges; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java index 2d22b61472a5a..fd4eec825774e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java @@ -59,7 +59,7 @@ static class Bucket extends InternalSignificantTerms.Bucket { supersetDf = in.readVLong(); term = in.readLong(); score = in.readDouble(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java index a73ee1818cf6a..551ecd6a9f23e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java @@ -57,7 +57,7 @@ public Bucket(StreamInput in, long subsetSize, long supersetSize, DocValueFormat subsetDf = in.readVLong(); supersetDf = in.readVLong(); score = in.readDouble(); - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 72a641ea5bb60..f40ff84bf2130 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -85,7 +85,7 @@ protected Bucket(StreamInput in, DocValueFormat formatter, boolean showDocCountE if (showDocCountError) { docCountError = in.readLong(); } - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java b/server/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java index bac7b6486bb88..323c41c110f93 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java +++ b/server/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java @@ -52,7 +52,7 @@ public InternalSearchResponse(SearchHits hits, InternalAggregations aggregations public InternalSearchResponse(StreamInput in) throws IOException { super( new SearchHits(in), - in.readBoolean() ? InternalAggregations.readAggregations(in) : null, + in.readBoolean() ? new InternalAggregations(in) : null, in.readBoolean() ? new Suggest(in) : null, in.readBoolean(), in.readOptionalBoolean(), @@ -64,7 +64,7 @@ public InternalSearchResponse(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { hits.writeTo(out); - out.writeOptionalStreamable((InternalAggregations)aggregations); + out.writeOptionalWriteable((InternalAggregations)aggregations); out.writeOptionalWriteable(suggest); out.writeBoolean(timedOut); out.writeOptionalBoolean(terminatedEarly); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 4ccfdd64b0d3c..16236c64291cc 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -284,7 +284,7 @@ public void readFromWithId(long id, StreamInput in) throws IOException { } setTopDocs(readTopDocs(in)); if (hasAggs = in.readBoolean()) { - aggregations = InternalAggregations.readAggregations(in); + aggregations = new InternalAggregations(in); } if (in.getVersion().before(Version.V_7_2_0)) { List pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream() diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java index aa244ff7a320b..e209a65cf3629 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java @@ -132,7 +132,7 @@ private void writeToAndReadFrom(InternalAggregations aggregations, int iteration aggregations.writeTo(out); try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), registry)) { in.setVersion(version); - InternalAggregations deserialized = InternalAggregations.readAggregations(in); + InternalAggregations deserialized = new InternalAggregations(in); assertEquals(aggregations.aggregations, deserialized.aggregations); if (aggregations.getTopLevelPipelineAggregators() == null) { assertEquals(0, deserialized.getTopLevelPipelineAggregators().size());