From 96d35aa27717b6d58bf6424e9615a4fb9eca0eea Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Fri, 27 Nov 2015 10:00:13 +0000 Subject: [PATCH] Aggregations Refactor: Refactor Serial Differencing Aggregation --- .../pipeline/serialdiff/SerialDiffParser.java | 24 ++-- .../SerialDiffPipelineAggregator.java | 104 ++++++++++++++++-- .../pipeline/SerialDifferenceTests.java | 47 ++++++++ 3 files changed, 155 insertions(+), 20 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/search/aggregations/pipeline/SerialDifferenceTests.java diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffParser.java index 45a252ded2b37..f05f0fc45f5ec 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffParser.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffParser.java @@ -25,8 +25,6 @@ import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; -import org.elasticsearch.search.aggregations.support.format.ValueFormat; -import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -50,8 +48,8 @@ public PipelineAggregatorFactory parse(String reducerName, XContentParser parser String currentFieldName = null; String[] bucketsPaths = null; String format = null; - GapPolicy gapPolicy = GapPolicy.SKIP; - int lag = 1; + GapPolicy gapPolicy = null; + Integer lag = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -102,20 +100,22 @@ public PipelineAggregatorFactory parse(String reducerName, XContentParser parser + "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation()); } - ValueFormatter formatter; + SerialDiffPipelineAggregator.Factory factory = new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths); + if (lag != null) { + factory.lag(lag); + } if (format != null) { - formatter = ValueFormat.Patternable.Number.format(format).formatter(); - } else { - formatter = ValueFormatter.RAW; + factory.format(format); } - - return new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths, formatter, gapPolicy, lag); + if (gapPolicy != null) { + factory.gapPolicy(gapPolicy); + } + return factory; } - // NORELEASE implement this method when refactoring this aggregation @Override public PipelineAggregatorFactory getFactoryPrototype() { - return null; + return new SerialDiffPipelineAggregator.Factory(null, null); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java index 5df97d336c9e6..7716c76821c66 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java @@ -23,15 +23,18 @@ import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation.Type; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.support.format.ValueFormat; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; @@ -39,10 +42,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; public class SerialDiffPipelineAggregator extends PipelineAggregator { @@ -144,20 +147,105 @@ public void doWriteTo(StreamOutput out) throws IOException { public static class Factory extends PipelineAggregatorFactory { - private final ValueFormatter formatter; - private GapPolicy gapPolicy; - private int lag; + private String format; + private GapPolicy gapPolicy = GapPolicy.SKIP; + private int lag = 1; - public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, int lag) { + public Factory(String name, String[] bucketsPaths) { super(name, TYPE.name(), bucketsPaths); - this.formatter = formatter; - this.gapPolicy = gapPolicy; + } + + /** + * Sets the lag to use when calculating the serial difference. + */ + public void lag(int lag) { this.lag = lag; } + /** + * Gets the lag to use when calculating the serial difference. + */ + public int lag() { + return lag; + } + + /** + * Sets the format to use on the output of this aggregation. + */ + public void format(String format) { + this.format = format; + } + + /** + * Gets the format to use on the output of this aggregation. + */ + public String format() { + return format; + } + + /** + * Sets the GapPolicy to use on the output of this aggregation. + */ + public void gapPolicy(GapPolicy gapPolicy) { + this.gapPolicy = gapPolicy; + } + + /** + * Gets the GapPolicy to use on the output of this aggregation. + */ + public GapPolicy gapPolicy() { + return gapPolicy; + } + + protected ValueFormatter formatter() { + if (format != null) { + return ValueFormat.Patternable.Number.format(format).formatter(); + } else { + return ValueFormatter.RAW; + } + } + @Override protected PipelineAggregator createInternal(Map metaData) throws IOException { - return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, lag, metaData); + return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter(), gapPolicy, lag, metaData); + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(SerialDiffParser.FORMAT.getPreferredName(), format); + } + builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); + builder.field(SerialDiffParser.LAG.getPreferredName(), lag); + return builder; + } + + @Override + protected PipelineAggregatorFactory doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException { + Factory factory = new Factory(name, bucketsPaths); + factory.format = in.readOptionalString(); + factory.gapPolicy = GapPolicy.readFrom(in); + factory.lag = in.readVInt(); + return factory; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalString(format); + gapPolicy.writeTo(out); + out.writeVInt(lag); + } + + @Override + protected int doHashCode() { + return Objects.hash(format, gapPolicy, lag); + } + @Override + protected boolean doEquals(Object obj) { + Factory other = (Factory) obj; + return Objects.equals(format, other.format) + && Objects.equals(gapPolicy, other.gapPolicy) + && Objects.equals(lag, other.lag); } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/SerialDifferenceTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/SerialDifferenceTests.java new file mode 100644 index 0000000000000..03ec5b971ce7d --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/SerialDifferenceTests.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator.Factory; + +public class SerialDifferenceTests extends BasePipelineAggregationTestCase { + + @Override + protected Factory createTestAggregatorFactory() { + String name = randomAsciiOfLengthBetween(3, 20); + String[] bucketsPaths = new String[1]; + bucketsPaths[0] = randomAsciiOfLengthBetween(3, 20); + Factory factory = new Factory(name, bucketsPaths); + if (randomBoolean()) { + factory.format(randomAsciiOfLengthBetween(1, 10)); + } + if (randomBoolean()) { + factory.gapPolicy(randomFrom(GapPolicy.values())); + } + if (randomBoolean()) { + factory.lag(randomIntBetween(1, 1000)); + } + return factory; + } + +}