-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Serialize top-level pipeline aggs as part of InternalAggregations #40177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.elasticsearch.search.aggregations; | ||
|
|
||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.common.io.stream.BytesStreamOutput; | ||
| import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; | ||
| import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.search.DocValueFormat; | ||
| import org.elasticsearch.search.SearchModule; | ||
| import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; | ||
| import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogramTests; | ||
| import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; | ||
| import org.elasticsearch.search.aggregations.bucket.terms.StringTermsTests; | ||
| import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; | ||
| import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; | ||
| import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests; | ||
| import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; | ||
| import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; | ||
| import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; | ||
| import org.elasticsearch.test.ESTestCase; | ||
| import org.elasticsearch.test.VersionUtils; | ||
| import org.hamcrest.Matchers; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Base64; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| public class InternalAggregationsTests extends ESTestCase { | ||
|
|
||
| private final NamedWriteableRegistry registry = new NamedWriteableRegistry( | ||
| new SearchModule(Settings.EMPTY, false, Collections.emptyList()).getNamedWriteables()); | ||
|
|
||
| public void testReduceEmptyAggs() { | ||
| List<InternalAggregations> aggs = Collections.emptyList(); | ||
| InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean()); | ||
| assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext)); | ||
| } | ||
|
|
||
| public void testNonFinalReduceTopLevelPipelineAggs() throws IOException { | ||
| InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), | ||
| 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); | ||
| List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms))); | ||
| List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>(); | ||
| MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); | ||
| topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create()); | ||
| InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false); | ||
| InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext); | ||
| assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size()); | ||
| assertEquals(1, reducedAggs.aggregations.size()); | ||
| } | ||
|
|
||
| public void testFinalReduceTopLevelPipelineAggs() throws IOException { | ||
| InternalAggregation terms = new StringTerms("name", BucketOrder.key(true), | ||
| 10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0); | ||
|
|
||
| MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test"); | ||
| SiblingPipelineAggregator siblingPipelineAggregator = (SiblingPipelineAggregator) maxBucketPipelineAggregationBuilder.create(); | ||
| InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, true); | ||
| final InternalAggregations reducedAggs; | ||
| if (randomBoolean()) { | ||
| InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms), | ||
| Collections.singletonList(siblingPipelineAggregator)); | ||
| reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext); | ||
| } else { | ||
| InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms)); | ||
| List<SiblingPipelineAggregator> topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator); | ||
| reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext); | ||
| } | ||
| assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size()); | ||
| assertEquals(2, reducedAggs.aggregations.size()); | ||
| } | ||
|
|
||
| public void testSerialization() throws Exception { | ||
| List<InternalAggregation> aggsList = new ArrayList<>(); | ||
| if (randomBoolean()) { | ||
| StringTermsTests stringTermsTests = new StringTermsTests(); | ||
| stringTermsTests.init(); | ||
| stringTermsTests.setUp(); | ||
| aggsList.add(stringTermsTests.createTestInstance()); | ||
| } | ||
| if (randomBoolean()) { | ||
| InternalDateHistogramTests dateHistogramTests = new InternalDateHistogramTests(); | ||
| dateHistogramTests.setUp(); | ||
| aggsList.add(dateHistogramTests.createTestInstance()); | ||
| } | ||
| if (randomBoolean()) { | ||
| InternalSimpleValueTests simpleValueTests = new InternalSimpleValueTests(); | ||
| aggsList.add(simpleValueTests.createTestInstance()); | ||
| } | ||
| List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>(); | ||
| if (randomBoolean()) { | ||
| if (randomBoolean()) { | ||
| topLevelPipelineAggs.add((SiblingPipelineAggregator)new MaxBucketPipelineAggregationBuilder("name1", "bucket1").create()); | ||
| } | ||
| if (randomBoolean()) { | ||
| topLevelPipelineAggs.add((SiblingPipelineAggregator)new AvgBucketPipelineAggregationBuilder("name2", "bucket2").create()); | ||
| } | ||
| if (randomBoolean()) { | ||
| topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create()); | ||
| } | ||
| } | ||
| InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs); | ||
| writeToAndReadFrom(aggregations, 0); | ||
| } | ||
|
|
||
| private void writeToAndReadFrom(InternalAggregations aggregations, int iteration) throws IOException { | ||
| Version version = VersionUtils.randomVersion(random()); | ||
| try (BytesStreamOutput out = new BytesStreamOutput()) { | ||
| out.setVersion(version); | ||
| aggregations.writeTo(out); | ||
| try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), registry)) { | ||
| in.setVersion(version); | ||
| InternalAggregations deserialized = InternalAggregations.readAggregations(in); | ||
| assertEquals(aggregations.aggregations, deserialized.aggregations); | ||
| if (aggregations.getTopLevelPipelineAggregators() == null) { | ||
| assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); | ||
| } else { | ||
| //TODO update version after backport | ||
| if (version.before(Version.V_8_0_0)) { | ||
| assertEquals(0, deserialized.getTopLevelPipelineAggregators().size()); | ||
| } else { | ||
| assertEquals(aggregations.getTopLevelPipelineAggregators().size(), | ||
| deserialized.getTopLevelPipelineAggregators().size()); | ||
| for (int i = 0; i < aggregations.getTopLevelPipelineAggregators().size(); i++) { | ||
| SiblingPipelineAggregator siblingPipelineAggregator1 = aggregations.getTopLevelPipelineAggregators().get(i); | ||
| SiblingPipelineAggregator siblingPipelineAggregator2 = deserialized.getTopLevelPipelineAggregators().get(i); | ||
| assertArrayEquals(siblingPipelineAggregator1.bucketsPaths(), siblingPipelineAggregator2.bucketsPaths()); | ||
| assertEquals(siblingPipelineAggregator1.name(), siblingPipelineAggregator2.name()); | ||
| } | ||
| } | ||
| } | ||
| if (iteration < 2) { | ||
| //serialize this enough times to make sure that we are able to write again what we read | ||
| writeToAndReadFrom(deserialized, iteration + 1); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| //TODO update version and rename after backport | ||
| public void testSerializationFromPre_8_0_0() throws IOException { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the intent of this test and I know that we have similar tests elsewhere but I think it should be moved to a rest test or omitted if we are confident that the existing rest tests are enough to test the bwc serialization. This test checks an internal class that we are allowed to change in a minor release (even a patch release) so I don't think we should use a static representation of the serialization that we'll need to change every time we make a modification to the serialization.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that yaml tests are overkill for this matter, as they are integration tests and take much longer to run. After the backport, this static version of the object is the binary representation of how we serialized the object prior to 6.7.0 (6.7.1 depending on what release the PR makes), which I am pretty sure we will not change. I can add a comment.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
They are overkilled if we write them only to test serialization but since we have some ccs rest tests already it shouldn't be too costly to add one that checks the support for pipeline aggregations. I also agree that we will probably not change the serialization of this class in 6.7.x but my point was more about the general idea of adding serialized bytes from a previous version in a unit test.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I plan to do integration tests for this scenario as part of #40038 , I wanted to add coverage there for the the field collapsing bug as well. I prefer the new java test over the yaml ones personally. But our current CCS integration test don't run against multiple versions, while this unit test makes sure that we can read something that was written from e.g. 6.6 compared to simulating that by calling readFrom on master and setting the version to 6.6. Do you see what I mean? Or am I missing something?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the intent but I forgot that we don't run the bwc tests in every module, let's leave it like this for now and we can discuss further in #40038 |
||
| String aggsString = "AwZzdGVybXMFb0F0Q0EKCQVsZG5ncgAFeG56RWcFeUFxVmcABXBhQVVpBUtYc2VIAAVaclRESwVqUkxySAAFelp5d1AFRUREcEYABW1" + | ||
| "sckF0BU5wWWVFAAVJYVJmZgVURlJVbgAFT0RiU04FUWNwSVoABU1sb09HBUNzZHFlAAVWWmJHaQABAwGIDgNyYXcFAQAADmRhdGVfaGlzdG9ncmFt" + | ||
| "BVhHbVl4/wADAAKAurcDA1VUQwABAQAAAWmOhukAAQAAAWmR9dEAAAAAAAAAAAAAAANyYXcACAAAAWmQrDoAUQAAAAFpkRoXAEMAAAABaZGH9AAtA" + | ||
| "AAAAWmR9dEAJwAAAAFpkmOuAFwAAAABaZLRiwAYAAAAAWmTP2gAKgAAAAFpk61FABsADHNpbXBsZV92YWx1ZQVsWVNLVv8AB2RlY2ltYWwGIyMjLi" + | ||
| "MjQLZWZVy5zBYAAAAAAAAAAAAAAAAAAAAAAAAA"; | ||
|
|
||
| byte[] aggsBytes = Base64.getDecoder().decode(aggsString); | ||
| try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(aggsBytes), registry)) { | ||
| in.setVersion(VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), | ||
| Version.max(Version.CURRENT.minimumCompatibilityVersion(), VersionUtils.getPreviousVersion(Version.CURRENT)))); | ||
| InternalAggregations deserialized = InternalAggregations.readAggregations(in); | ||
| assertEquals(3, deserialized.aggregations.size()); | ||
| assertThat(deserialized.aggregations.get(0), Matchers.instanceOf(StringTerms.class)); | ||
| assertThat(deserialized.aggregations.get(1), Matchers.instanceOf(InternalDateHistogram.class)); | ||
| assertThat(deserialized.aggregations.get(2), Matchers.instanceOf(InternalSimpleValue.class)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other ctr could call this one with:
this(aggregations, Collections.emptyList())ensuring that thetopLevelPipelineAggregatorslist is nevernull?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added to my TODO list to convert this class to Writeable.