diff --git a/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java b/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java index 01685535a4e0e..31f7a8dc99c6b 100644 --- a/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java @@ -44,8 +44,10 @@ import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.fetch.subphase.highlight.Highlighter; import org.elasticsearch.search.suggest.Suggester; @@ -84,6 +86,13 @@ default List> getMovingAverageModels() { return emptyList(); } + /** + * The new {@link MovModel}s defined by this plugin. {@linkplain MovModel}s are used by the {@link MovFunctionPipelineAggregator} to + * execute functions on windows of data. + */ + default List> getMovingFunctionModels() { + return emptyList(); + } /** * The new {@link FetchSubPhase}s defined by this plugin. */ diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 16bd9dbe8b931..2abc482d5897d 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -206,14 +206,21 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.moving.models.EwmaModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltLinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltWintersModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.LinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MaxModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MedianModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MinModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SumModel; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; import org.elasticsearch.search.fetch.FetchPhase; @@ -269,6 +276,8 @@ public class SearchModule { "significance_heuristic"); private final ParseFieldRegistry movingAverageModelParserRegistry = new ParseFieldRegistry<>( "moving_avg_model"); + private final ParseFieldRegistry movingFunctionModelParserRegistry = new ParseFieldRegistry<>( + "moving_function_model"); private final List fetchSubPhases = new ArrayList<>(); @@ -288,6 +297,7 @@ public SearchModule(Settings settings, boolean transportClient, List getMovingAverageModel return movingAverageModelParserRegistry; } + /** + * The registry of {@link MovModel}s. + */ + public ParseFieldRegistry getMovingFunctionModelParserRegistry() { + return movingFunctionModelParserRegistry; + } + private void registerAggregations(List plugins) { registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder::parse) .addResultReader(InternalAvg::new)); @@ -481,6 +498,12 @@ private void registerPipelineAggregations(List plugins) { MovAvgPipelineAggregator::new, (n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c)) /* Uses InternalHistogram for buckets */); + registerPipelineAggregation(new PipelineAggregationSpec( + MovFunctionPipelineAggregationBuilder.NAME, + MovFunctionPipelineAggregationBuilder::new, + MovFunctionPipelineAggregator::new, + (n, c) -> MovFunctionPipelineAggregationBuilder.parse(movingFunctionModelParserRegistry, n, c)) + /* Uses InternalHistogram for buckets */); registerPipelineAggregation(new PipelineAggregationSpec( CumulativeSumPipelineAggregationBuilder.NAME, CumulativeSumPipelineAggregationBuilder::new, @@ -660,6 +683,21 @@ private void registerMovingAverageModel(SearchExtensionSpec plugins) { + registerMovingFunctionModel(new SearchExtensionSpec<>(MaxModel.NAME, MaxModel::new, MaxModel.PARSER)); + registerMovingFunctionModel(new SearchExtensionSpec<>(MinModel.NAME, MinModel::new, MinModel.PARSER)); + registerMovingFunctionModel(new SearchExtensionSpec<>(MedianModel.NAME, MedianModel::new, MedianModel.PARSER)); + registerMovingFunctionModel(new SearchExtensionSpec<>(SumModel.NAME, SumModel::new, SumModel.PARSER)); + + registerFromPlugin(plugins, SearchPlugin::getMovingFunctionModels, this::registerMovingFunctionModel); + } + + private void registerMovingFunctionModel(SearchExtensionSpec movModel) { + movingFunctionModelParserRegistry.register(movModel.getParser(), movModel.getName()); + namedWriteables.add( + new NamedWriteableRegistry.Entry(MovModel.class, movModel.getName().getPreferredName(), movModel.getReader())); + } + private void registerFetchSubPhases(List plugins) { registerFetchSubPhase(new ExplainFetchSubPhase()); registerFetchSubPhase(new DocValueFieldsFetchSubPhase()); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index ce738d7a61899..5e639145b5fa2 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -31,7 +31,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder; import java.util.Map; @@ -79,6 +80,10 @@ public static MovAvgPipelineAggregationBuilder movingAvg(String name, String buc return new MovAvgPipelineAggregationBuilder(name, bucketsPath); } + public static MovFunctionPipelineAggregationBuilder movingFunction(String name, String bucketsPath, Script script) { + return new MovFunctionPipelineAggregationBuilder(name, bucketsPath, script); + } + public static BucketScriptPipelineAggregationBuilder bucketScript(String name, Map bucketsPathsMap, Script script) { return new BucketScriptPipelineAggregationBuilder(name, bucketsPathsMap, script); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovAvgPipelineAggregationBuilder.java similarity index 94% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovAvgPipelineAggregationBuilder.java index bc973ad442f54..a5bd265544f3c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovAvgPipelineAggregationBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg; +package org.elasticsearch.search.aggregations.pipeline.moving; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParsingException; @@ -35,9 +35,10 @@ import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModelBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel; import java.io.IOException; import java.text.ParseException; @@ -53,9 +54,6 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { public static final String NAME = "moving_avg"; - public static final ParseField MODEL = new ParseField("model"); - private static final ParseField WINDOW = new ParseField("window"); - public static final ParseField SETTINGS = new ParseField("settings"); private static final ParseField PREDICT = new ParseField("predict"); private static final ParseField MINIMIZE = new ParseField("minimize"); @@ -169,11 +167,11 @@ public int window() { * @param model * A MovAvgModel which has been prepopulated with settings */ - public MovAvgPipelineAggregationBuilder modelBuilder(MovAvgModelBuilder model) { + public MovAvgPipelineAggregationBuilder modelBuilder(MovModelBuilder model) { if (model == null) { throw new IllegalArgumentException("[model] must not be null: [" + name + "]"); } - this.model = model.build(); + this.model = (MovAvgModel)model.build(); return this; } @@ -292,7 +290,7 @@ protected XContentBuilder internalXContent(XContentBuilder builder, Params param } builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName()); model.toXContent(builder, params); - builder.field(WINDOW.getPreferredName(), window); + builder.field(MovModel.WINDOW.getPreferredName(), window); if (predict > 0) { builder.field(PREDICT.getPreferredName(), predict); } @@ -322,7 +320,7 @@ public static MovAvgPipelineAggregationBuilder parse( if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token == XContentParser.Token.VALUE_NUMBER) { - if (WINDOW.match(currentFieldName)) { + if (MovModel.WINDOW.match(currentFieldName)) { window = parser.intValue(); if (window <= 0) { throw new ParsingException(parser.getTokenLocation(), "[" + currentFieldName + "] value must be a positive, " @@ -345,7 +343,7 @@ public static MovAvgPipelineAggregationBuilder parse( bucketsPaths = new String[] { parser.text() }; } else if (GAP_POLICY.match(currentFieldName)) { gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); - } else if (MODEL.match(currentFieldName)) { + } else if (MovModel.MODEL.match(currentFieldName)) { model = parser.text(); } else { throw new ParsingException(parser.getTokenLocation(), @@ -364,7 +362,7 @@ public static MovAvgPipelineAggregationBuilder parse( "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); } } else if (token == XContentParser.Token.START_OBJECT) { - if (SETTINGS.match(currentFieldName)) { + if (MovModel.SETTINGS.match(currentFieldName)) { settings = parser.map(); } else { throw new ParsingException(parser.getTokenLocation(), @@ -406,7 +404,7 @@ public static MovAvgPipelineAggregationBuilder parse( MovAvgModel.AbstractModelParser modelParser = movingAverageMdelParserRegistry.lookup(model, parser.getTokenLocation()); MovAvgModel movAvgModel; try { - movAvgModel = modelParser.parse(settings, pipelineAggregatorName, factory.window()); + movAvgModel = (MovAvgModel) modelParser.parse(settings, pipelineAggregatorName, factory.window()); } catch (ParseException exception) { throw new ParsingException(parser.getTokenLocation(), "Could not parse settings for model [" + model + "].", exception); } @@ -438,4 +436,4 @@ protected boolean doEquals(Object obj) { public String getWriteableName() { return NAME; } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovAvgPipelineAggregator.java similarity index 98% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovAvgPipelineAggregator.java index 196f7cca4737f..93ff02bef0fd0 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovAvgPipelineAggregator.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg; +package org.elasticsearch.search.aggregations.pipeline.moving; import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.common.io.stream.StreamInput; @@ -33,7 +33,7 @@ import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel; import java.io.IOException; import java.util.ArrayList; @@ -124,6 +124,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext // Some models (e.g. HoltWinters) have certain preconditions that must be met if (model.hasValue(values.size())) { + double movavg = model.next(values); List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map((p) -> { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovFunctionPipelineAggregationBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovFunctionPipelineAggregationBuilder.java new file mode 100644 index 0000000000000..99426feef4eec --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovFunctionPipelineAggregationBuilder.java @@ -0,0 +1,386 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving; + +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ParseFieldRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModelBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel; + +import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY; + +public class MovFunctionPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { + public static final String NAME = "moving_fn"; + + private String format; + private GapPolicy gapPolicy = GapPolicy.SKIP; + private int window = 5; + private MovModel function = null; + private Script script = null; + + public MovFunctionPipelineAggregationBuilder(String name, String bucketsPath, Script script) { + super(name, NAME, new String[] { bucketsPath }); + this.script = script; + } + + /** + * Read from a stream. + */ + public MovFunctionPipelineAggregationBuilder(StreamInput in) throws IOException { + super(in, NAME); + format = in.readOptionalString(); + gapPolicy = GapPolicy.readFrom(in); + window = in.readVInt(); + function = in.readOptionalNamedWriteable(MovModel.class); + script = in.readOptionalWriteable(Script::new); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalString(format); + gapPolicy.writeTo(out); + out.writeVInt(window); + out.writeOptionalNamedWriteable(function); + out.writeOptionalWriteable(script); + } + + /** + * Sets the format to use on the output of this aggregation. + */ + public MovFunctionPipelineAggregationBuilder format(String format) { + if (format == null) { + throw new IllegalArgumentException("[format] must not be null: [" + name + "]"); + } + this.format = format; + return this; + } + + /** + * Gets the format to use on the output of this aggregation. + */ + public String format() { + return format; + } + + /** + * Sets the GapPolicy to use on the output of this aggregation. + */ + public MovFunctionPipelineAggregationBuilder gapPolicy(GapPolicy gapPolicy) { + if (gapPolicy == null) { + throw new IllegalArgumentException("[gapPolicy] must not be null: [" + name + "]"); + } + this.gapPolicy = gapPolicy; + return this; + } + + /** + * Gets the GapPolicy to use on the output of this aggregation. + */ + public GapPolicy gapPolicy() { + return gapPolicy; + } + + protected DocValueFormat formatter() { + if (format != null) { + return new DocValueFormat.Decimal(format); + } else { + return DocValueFormat.RAW; + } + } + + /** + * Sets the window size for the moving fn. This window will "slide" + * across the series, and the values inside that window will be used to + * calculate the moving function value + * + * @param window + * Size of window + */ + public MovFunctionPipelineAggregationBuilder window(int window) { + if (window <= 0) { + throw new IllegalArgumentException("[window] must be a positive integer: [" + name + "]"); + } + this.window = window; + return this; + } + + /** + * Gets the window size for the moving function. This window will "slide" + * across the series, and the values inside that window will be used to + * calculate the moving function value + */ + public int window() { + return window; + } + + /** + * Sets a MovModel for the Moving Function. The function is used to + * define what type of moving fn you want to use on the series + * + * @param function + * A MovModel which has been prepopulated with settings + */ + public MovFunctionPipelineAggregationBuilder modelBuilder(MovModelBuilder function) { + if (function == null) { + throw new IllegalArgumentException("[function] must not be null: [" + name + "]"); + } + this.function = function.build(); + return this; + } + + /** + * Sets a MovModel for the moving function. The function is used to + * define what type of moving fn you want to use on the series + * + * @param function + * A MovModel which has been prepopulated with settings + */ + public MovFunctionPipelineAggregationBuilder function(MovModel function) { + if (function == null) { + throw new IllegalArgumentException("[function] must not be null: [" + name + "]"); + } + this.function = function; + return this; + } + + /** + * Gets a MovModel for the moving function. The function is used to + * define what type of moving fn you want to use on the series + */ + public MovModel function() { + return function; + } + + public Script getScript() { + return script; + } + + public MovFunctionPipelineAggregationBuilder setScript(Script script) { + this.script = script; + return this; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new MovFunctionPipelineAggregator(name, bucketsPaths, formatter(), gapPolicy, window, function, + script, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, + List pipelineAggregatoractories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for aggregation [" + name + "]"); + } + if (parent instanceof HistogramAggregatorFactory) { + HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent; + if (histoParent.minDocCount() != 0) { + throw new IllegalStateException("parent histogram of moving function aggregation [" + name + + "] must have min_doc_count of 0"); + } + } else if (parent instanceof DateHistogramAggregatorFactory) { + DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent; + if (histoParent.minDocCount() != 0) { + throw new IllegalStateException("parent histogram of moving function aggregation [" + name + + "] must have min_doc_count of 0"); + } + } else { + throw new IllegalStateException("moving function aggregation [" + name + + "] must have a histogram or date_histogram as parent"); + } + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(FORMAT.getPreferredName(), format); + } + builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName()); + if (function != null) { + function.toXContent(builder, params); + } + if (script != null) { + builder.field(Script.SCRIPT_PARSE_FIELD.getPreferredName()); + script.toXContent(builder, params); + } + builder.field(MovModel.WINDOW.getPreferredName(), window); + return builder; + } + + public static MovFunctionPipelineAggregationBuilder parse( + ParseFieldRegistry movingFunctionModelParserRegistry, + String pipelineAggregatorName, QueryParseContext context) throws IOException { + XContentParser parser = context.parser(); + XContentParser.Token token; + String currentFieldName = null; + String[] bucketsPaths = null; + String format = null; + + GapPolicy gapPolicy = null; + Integer window = null; + Map settings = null; + String model = null; + Script script = null; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (MovModel.WINDOW.match(currentFieldName)) { + window = parser.intValue(); + if (window <= 0) { + throw new ParsingException(parser.getTokenLocation(), "[" + currentFieldName + "] value must be a positive, " + + "non-zero integer. Value supplied was [" + window + "] in [" + pipelineAggregatorName + "]."); + } + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (FORMAT.match(currentFieldName)) { + format = parser.text(); + } else if (BUCKETS_PATH.match(currentFieldName)) { + bucketsPaths = new String[] { parser.text() }; + } else if (GAP_POLICY.match(currentFieldName)) { + gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); + } else if (MovModel.FUNCTION.match(currentFieldName)) { + if (script != null) { + throw new ParsingException(parser.getTokenLocation(), + "A function and script cannot both be defined for MovingFunction aggregation."); + } + model = parser.text(); + } else if (Script.SCRIPT_PARSE_FIELD.match(currentFieldName)) { + if (model != null) { + throw new ParsingException(parser.getTokenLocation(), + "A function and script cannot both be defined for MovingFunction aggregation."); + } + script = Script.parse(parser); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (BUCKETS_PATH.match(currentFieldName)) { + List paths = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + String path = parser.text(); + paths.add(path); + } + bucketsPaths = paths.toArray(new String[paths.size()]); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (MovModel.SETTINGS.match(currentFieldName)) { + settings = parser.map(); + } else if (Script.SCRIPT_PARSE_FIELD.match(currentFieldName)) { + if (model != null) { + throw new ParsingException(parser.getTokenLocation(), + "A function and script cannot both be defined for MovingFunction aggregation."); + } + script = Script.parse(parser); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "]."); + } + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unexpected token " + token + " in [" + pipelineAggregatorName + "]."); + } + } + + if (bucketsPaths == null) { + throw new ParsingException(parser.getTokenLocation(), "Missing required field [" + BUCKETS_PATH.getPreferredName() + + "] for moving_function aggregation [" + pipelineAggregatorName + "]"); + } + + MovFunctionPipelineAggregationBuilder factory = + new MovFunctionPipelineAggregationBuilder(pipelineAggregatorName, bucketsPaths[0], script); + if (format != null) { + factory.format(format); + } + if (gapPolicy != null) { + factory.gapPolicy(gapPolicy); + } + if (window != null) { + factory.window(window); + } + if (model == null && script == null) { + throw new RuntimeException("A function or script must be defined for the MovingFunction aggregation."); + } + if (model != null) { + MovModel.AbstractModelParser modelParser = movingFunctionModelParserRegistry.lookup(model, parser.getTokenLocation()); + MovModel movModel; + try { + movModel = modelParser.parse(settings, pipelineAggregatorName, factory.window()); + } catch (ParseException exception) { + throw new ParsingException(parser.getTokenLocation(), "Could not parse settings for function [" + model + "].", exception); + } + factory.function(movModel); + } + + return factory; + } + + @Override + protected int doHashCode() { + return Objects.hash(format, gapPolicy, window, function); + } + + @Override + protected boolean doEquals(Object obj) { + MovFunctionPipelineAggregationBuilder other = (MovFunctionPipelineAggregationBuilder) obj; + return Objects.equals(format, other.format) + && Objects.equals(gapPolicy, other.gapPolicy) + && Objects.equals(window, other.window) + && Objects.equals(function, other.function) + && Objects.equals(script, other.script); + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovFunctionPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovFunctionPipelineAggregator.java new file mode 100644 index 0000000000000..5fa87065b0f2c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/MovFunctionPipelineAggregator.java @@ -0,0 +1,165 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving; + +import org.elasticsearch.common.collect.EvictingQueue; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public class MovFunctionPipelineAggregator extends PipelineAggregator { + private final DocValueFormat formatter; + private final GapPolicy gapPolicy; + private final int window; + private MovModel function; + private final Script script; + + MovFunctionPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, GapPolicy gapPolicy, + int window, MovModel function, Script script, Map metadata) { + super(name, bucketsPaths, metadata); + this.formatter = formatter; + this.gapPolicy = gapPolicy; + this.window = window; + this.function = function; + this.script = script; + } + + /** + * Read from a stream. + */ + public MovFunctionPipelineAggregator(StreamInput in) throws IOException { + super(in); + formatter = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = GapPolicy.readFrom(in); + window = in.readVInt(); + function = in.readOptionalNamedWriteable(MovModel.class); + script = in.readOptionalWriteable(Script::new); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + out.writeNamedWriteable(formatter); + gapPolicy.writeTo(out); + out.writeVInt(window); + out.writeOptionalNamedWriteable(function); + out.writeOptionalWriteable(script); + } + + @Override + public String getWriteableName() { + return MovFunctionPipelineAggregationBuilder.NAME; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalMultiBucketAggregation + histo = (InternalMultiBucketAggregation) aggregation; + List buckets = histo.getBuckets(); + HistogramFactory factory = (HistogramFactory) histo; + + List newBuckets = new ArrayList<>(); + EvictingQueue values = new EvictingQueue<>(this.window); + + // For script, if it exists + ExecutableScript executableScript = null; + + if (script != null) { + Map vars = new HashMap<>(); + ExecutableScript.Factory scriptFactory = reduceContext.scriptService() + .compile(script, ExecutableScript.AGGS_CONTEXT); + + if (script.getParams() != null) { + vars.putAll(script.getParams()); + } + vars.put("values", values.toArray(new Double[values.size()])); + executableScript = scriptFactory.newInstance(vars); + } + + for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) { + Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); + + // Default is to reuse existing bucket. Simplifies the rest of the logic, + // since we only change newBucket if we can add to it + Bucket newBucket = bucket; + + if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) { + + if (values.size() > 0) { + double value = Double.NaN; + if (executableScript != null) { + Object returned = executableScript.run(); + + // Scripts are allowed to return null, that will just skip + // the output for this bucket. Add the existing bucket, add the value + // and continue to next bucket + if (returned == null) { + newBuckets.add(newBucket); + values.offer(thisBucketValue); + continue; + } + + if (!(returned instanceof Number)) { + throw new AggregationExecutionException("Script for MovingFunction [" + name() + + "] must return a Number"); + } + value = ((Number) returned).doubleValue(); + + } else { + value = function.next(values); + } + + List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) + .map((p) -> (InternalAggregation) p) + .collect(Collectors.toList()); + aggs.add(new InternalSimpleValue(name(), value, formatter, new ArrayList<>(), metaData())); + newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), + new InternalAggregations(aggs)); + } + values.offer(thisBucketValue); + } + newBuckets.add(newBucket); + } + return factory.createAggregation(newBuckets); + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/SimulatedAnealingMinimizer.java similarity index 97% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/SimulatedAnealingMinimizer.java index 711ee2299cffc..52032ca8b4720 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/SimulatedAnealingMinimizer.java @@ -17,10 +17,10 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg; +package org.elasticsearch.search.aggregations.pipeline.moving; import org.elasticsearch.common.collect.EvictingQueue; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel; /** * A cost minimizer which will fit a MovAvgModel to the data. diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/EwmaModel.java similarity index 84% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/EwmaModel.java index 26fb0333b188b..a009758d991ed 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/EwmaModel.java @@ -17,13 +17,12 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg.models; +package org.elasticsearch.search.aggregations.pipeline.moving.models; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; import java.io.IOException; import java.text.ParseException; @@ -32,6 +31,7 @@ import java.util.Map; import java.util.Objects; + /** * Calculate a exponentially weighted moving average */ @@ -90,7 +90,7 @@ public MovAvgModel clone() { } @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; // EWMA just emits the same final prediction repeatedly. @@ -100,16 +100,16 @@ protected double[] doPredict(Collection values, int numPre } @Override - public double next(Collection values) { + public double next(Collection values) { double avg = 0; boolean first = true; - for (T v : values) { + for (Double v : values) { if (first) { - avg = v.doubleValue(); + avg = v; first = false; } else { - avg = (v.doubleValue() * alpha) + (avg * (1 - alpha)); + avg = (v * alpha) + (avg * (1 - alpha)); } } return avg; @@ -117,8 +117,8 @@ public double next(Collection values) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); - builder.startObject(MovAvgPipelineAggregationBuilder.SETTINGS.getPreferredName()); + builder.field(MovModel.MODEL.getPreferredName(), NAME); + builder.startObject(MovModel.SETTINGS.getPreferredName()); builder.field("alpha", alpha); builder.endObject(); return builder; @@ -150,7 +150,7 @@ public boolean equals(Object obj) { return Objects.equals(alpha, other.alpha); } - public static class EWMAModelBuilder implements MovAvgModelBuilder { + public static class EWMAModelBuilder implements MovModelBuilder { private double alpha = DEFAULT_ALPHA; @@ -170,8 +170,8 @@ public EWMAModelBuilder alpha(double alpha) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); - builder.startObject(MovAvgPipelineAggregationBuilder.SETTINGS.getPreferredName()); + builder.field(MovModel.MODEL.getPreferredName(), NAME); + builder.startObject(MovModel.SETTINGS.getPreferredName()); builder.field("alpha", alpha); builder.endObject(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/HoltLinearModel.java similarity index 90% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/HoltLinearModel.java index 1819333738502..410f588ca9679 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/HoltLinearModel.java @@ -17,13 +17,12 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg.models; +package org.elasticsearch.search.aggregations.pipeline.moving.models; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; import java.io.IOException; import java.text.ParseException; @@ -116,16 +115,15 @@ public MovAvgModel clone() { * * @param values Collection of numerics to movingAvg, usually windowed * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric * @return Returns an array of doubles, since most smoothing methods operate on floating points */ @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { return next(values, numPredictions); } @Override - public double next(Collection values) { + public double next(Collection values) { return next(values, 1)[0]; } @@ -180,8 +178,8 @@ public double[] next(Collection values, int numForecasts) @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); - builder.startObject(MovAvgPipelineAggregationBuilder.SETTINGS.getPreferredName()); + builder.field(MovModel.MODEL.getPreferredName(), NAME); + builder.startObject(MovModel.SETTINGS.getPreferredName()); builder.field("alpha", alpha); builder.field("beta", beta); builder.endObject(); @@ -219,7 +217,7 @@ public boolean equals(Object obj) { } - public static class HoltLinearModelBuilder implements MovAvgModelBuilder { + public static class HoltLinearModelBuilder implements MovModelBuilder { private double alpha = DEFAULT_ALPHA; private double beta = DEFAULT_BETA; @@ -251,8 +249,8 @@ public HoltLinearModelBuilder beta(double beta) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); - builder.startObject(MovAvgPipelineAggregationBuilder.SETTINGS.getPreferredName()); + builder.field(MovModel.MODEL.getPreferredName(), NAME); + builder.startObject(MovModel.SETTINGS.getPreferredName()); builder.field("alpha", alpha); builder.field("beta", beta); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/HoltWintersModel.java similarity index 95% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/HoltWintersModel.java index 92b2e4d3ea26e..9d19a05caeed7 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/HoltWintersModel.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg.models; +package org.elasticsearch.search.aggregations.pipeline.moving.models; import org.elasticsearch.ElasticsearchParseException; @@ -27,7 +27,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationExecutionException; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; import java.io.IOException; import java.text.ParseException; @@ -258,16 +257,15 @@ public boolean hasValue(int valuesAvailable) { * * @param values Collection of numerics to movingAvg, usually windowed * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric * @return Returns an array of doubles, since most smoothing methods operate on floating points */ @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { return next(values, numPredictions); } @Override - public double next(Collection values) { + public double next(Collection values) { return next(values, 1)[0]; } @@ -363,8 +361,8 @@ public double[] next(Collection values, int numForecasts) @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); - builder.startObject(MovAvgPipelineAggregationBuilder.SETTINGS.getPreferredName()); + builder.field(MovModel.MODEL.getPreferredName(), NAME); + builder.startObject(MovModel.SETTINGS.getPreferredName()); builder.field("alpha", alpha); builder.field("beta", beta); builder.field("gamma", gamma); @@ -434,7 +432,7 @@ public boolean equals(Object obj) { && Objects.equals(pad, other.pad); } - public static class HoltWintersModelBuilder implements MovAvgModelBuilder { + public static class HoltWintersModelBuilder implements MovModelBuilder { private double alpha = DEFAULT_ALPHA; private double beta = DEFAULT_BETA; @@ -491,8 +489,8 @@ public HoltWintersModelBuilder pad(boolean pad) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); - builder.startObject(MovAvgPipelineAggregationBuilder.SETTINGS.getPreferredName()); + builder.field(MovModel.MODEL.getPreferredName(), NAME); + builder.startObject(MovModel.SETTINGS.getPreferredName()); builder.field("alpha", alpha); builder.field("beta", beta); builder.field("gamma", gamma); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/LinearModel.java similarity index 84% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/LinearModel.java index 3eed0bf603baa..4cb56611e5aa6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/LinearModel.java @@ -17,14 +17,13 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg.models; +package org.elasticsearch.search.aggregations.pipeline.moving.models; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; import java.io.IOException; import java.text.ParseException; @@ -74,7 +73,7 @@ public MovAvgModel clone() { } @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; // EWMA just emits the same final prediction repeatedly. @@ -84,13 +83,13 @@ protected double[] doPredict(Collection values, int numPr } @Override - public double next(Collection values) { + public double next(Collection values) { double avg = 0; long totalWeight = 1; long current = 1; - for (T v : values) { - avg += v.doubleValue() * current; + for (Double v : values) { + avg += v * current; totalWeight += current; current += 1; } @@ -99,7 +98,7 @@ public double next(Collection values) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); + builder.field(MovModel.MODEL.getPreferredName(), NAME); return builder; } @@ -111,10 +110,10 @@ public MovAvgModel parse(@Nullable Map settings, String pipeline } }; - public static class LinearModelBuilder implements MovAvgModelBuilder { + public static class LinearModelBuilder implements MovModelBuilder { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); + builder.field(MovModel.MODEL.getPreferredName(), NAME); return builder; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MaxModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MaxModel.java new file mode 100644 index 0000000000000..c9ed7ccea5245 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MaxModel.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.models; + + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.text.ParseException; +import java.util.Collection; +import java.util.Map; + +public class MaxModel extends MovModel { + public static final String NAME = "max"; + + public MaxModel() { + } + + /** + * Read from a stream. + */ + public MaxModel(StreamInput in) throws IOException { + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // Nothing to write + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public MovModel clone() { + return new MaxModel(); + } + + @Override + public double next(Collection values) { + return values.stream().mapToDouble(Double::doubleValue).max().orElse(Double.NaN); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + public static final AbstractModelParser PARSER = new AbstractModelParser() { + @Override + public MovModel parse(@Nullable Map settings, String pipelineName, int windowSize) throws ParseException { + checkUnrecognizedParams(settings); + return new MaxModel(); + } + }; + + public static class MaxModelBuilder implements MovModelBuilder { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + @Override + public MovModel build() { + return new MaxModel(); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MedianModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MedianModel.java new file mode 100644 index 0000000000000..90495712b43e2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MedianModel.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.models; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class MedianModel extends MovModel { + public static final String NAME = "median"; + + public MedianModel() { + } + + /** + * Read from a stream. + */ + public MedianModel(StreamInput in) throws IOException { + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // Nothing to write + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public MovModel clone() { + return new MedianModel(); + } + + @Override + public double next(Collection values) { + List listValues = new ArrayList<>(values); + Collections.sort(listValues); + if (values.size() % 2 == 0) { + int middle = (int)Math.floor(values.size()/2.0); + return (listValues.get(middle - 1) + listValues.get(middle) ) / 2; + } + + return listValues.get(listValues.size() / 2); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + public static final AbstractModelParser PARSER = new AbstractModelParser() { + @Override + public MovModel parse(@Nullable Map settings, String pipelineName, int windowSize) throws ParseException { + checkUnrecognizedParams(settings); + return new MedianModel(); + } + }; + + public static class MedianModelBuilder implements MovModelBuilder { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + @Override + public MovModel build() { + return new MedianModel(); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MinModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MinModel.java new file mode 100644 index 0000000000000..4d505a4ff5fc3 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MinModel.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.models; + + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.text.ParseException; +import java.util.Collection; +import java.util.Map; + +public class MinModel extends MovModel { + public static final String NAME = "min"; + + public MinModel() { + } + + /** + * Read from a stream. + */ + public MinModel(StreamInput in) throws IOException { + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // Nothing to write + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public MovModel clone() { + return new MinModel(); + } + + @Override + public double next(Collection values) { + return values.stream().mapToDouble(Double::doubleValue).min().orElse(Double.NaN); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + public static final AbstractModelParser PARSER = new AbstractModelParser() { + @Override + public MovModel parse(@Nullable Map settings, String pipelineName, int windowSize) throws ParseException { + checkUnrecognizedParams(settings); + return new MinModel(); + } + }; + + public static class MinModelBuilder implements MovModelBuilder { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + @Override + public MovModel build() { + return new MinModel(); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovAvgModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovAvgModel.java new file mode 100644 index 0000000000000..d323c9caa5626 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovAvgModel.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.models; + +import java.util.Arrays; +import java.util.Collection; + +public abstract class MovAvgModel extends MovModel { + + /** + * Should this model be fit to the data via a cost minimizing algorithm by default? + */ + public boolean minimizeByDefault() { + return false; + } + + /** + * Returns if the model can be cost minimized. Not all models have parameters + * which can be tuned / optimized. + */ + public abstract boolean canBeMinimized(); + + /** + * Generates a "neighboring" model, where one of the tunable parameters has been + * randomly mutated within the allowed range. Used for minimization + */ + public abstract MovAvgModel neighboringModel(); + + + + /** + * Predicts the next `n` values in the series. + * + * @param values Collection of numerics to movingAvg, usually windowed + * @param numPredictions Number of newly generated predictions to return + * @return Returns an array of doubles, since most smoothing methods operate on floating points + */ + public double[] predict(Collection values, int numPredictions) { + assert(numPredictions >= 1); + + // If there are no values, we can't do anything. Return an array of NaNs. + if (values.isEmpty()) { + return emptyPredictions(numPredictions); + } + + return doPredict(values, numPredictions); + } + + /** + * Calls to the model-specific implementation which actually generates the predictions + * + * @param values Collection of numerics to movingAvg, usually windowed + * @param numPredictions Number of newly generated predictions to return + * @return Returns an array of doubles, since most smoothing methods operate on floating points + */ + protected abstract double[] doPredict(Collection values, int numPredictions); + + /** + * Returns an empty set of predictions, filled with NaNs + * @param numPredictions Number of empty predictions to generate + */ + double[] emptyPredictions(int numPredictions) { + double[] predictions = new double[numPredictions]; + Arrays.fill(predictions, Double.NaN); + return predictions; + } + + +} + + + + diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovModel.java similarity index 66% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovModel.java index f64117236d6d4..9bdedee92f306 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovModel.java @@ -17,39 +17,25 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg.models; +package org.elasticsearch.search.aggregations.pipeline.moving.models; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ToXContent; import java.io.IOException; import java.text.ParseException; -import java.util.Arrays; import java.util.Collection; import java.util.Map; -public abstract class MovAvgModel implements NamedWriteable, ToXContent { +public abstract class MovModel implements NamedWriteable, ToXContent { - /** - * Should this model be fit to the data via a cost minimizing algorithm by default? - */ - public boolean minimizeByDefault() { - return false; - } - - /** - * Returns if the model can be cost minimized. Not all models have parameters - * which can be tuned / optimized. - */ - public abstract boolean canBeMinimized(); - - /** - * Generates a "neighboring" model, where one of the tunable parameters has been - * randomly mutated within the allowed range. Used for minimization - */ - public abstract MovAvgModel neighboringModel(); + public static final ParseField MODEL = new ParseField("model"); + public static final ParseField FUNCTION = new ParseField("function"); + public static final ParseField WINDOW = new ParseField("window"); + public static final ParseField SETTINGS = new ParseField("settings"); /** * Checks to see this model can produce a new value, without actually running the algo. @@ -67,50 +53,10 @@ public boolean hasValue(int valuesAvailable) { /** * Returns the next value in the series, according to the underlying smoothing model * - * @param values Collection of numerics to movingAvg, usually windowed - * @param Type of numeric + * @param values Collection of numerics to movingFn, usually windowed * @return Returns a double, since most smoothing methods operate on floating points */ - public abstract double next(Collection values); - - /** - * Predicts the next `n` values in the series. - * - * @param values Collection of numerics to movingAvg, usually windowed - * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric - * @return Returns an array of doubles, since most smoothing methods operate on floating points - */ - public double[] predict(Collection values, int numPredictions) { - assert(numPredictions >= 1); - - // If there are no values, we can't do anything. Return an array of NaNs. - if (values.isEmpty()) { - return emptyPredictions(numPredictions); - } - - return doPredict(values, numPredictions); - } - - /** - * Calls to the model-specific implementation which actually generates the predictions - * - * @param values Collection of numerics to movingAvg, usually windowed - * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric - * @return Returns an array of doubles, since most smoothing methods operate on floating points - */ - protected abstract double[] doPredict(Collection values, int numPredictions); - - /** - * Returns an empty set of predictions, filled with NaNs - * @param numPredictions Number of empty predictions to generate - */ - protected double[] emptyPredictions(int numPredictions) { - double[] predictions = new double[numPredictions]; - Arrays.fill(predictions, Double.NaN); - return predictions; - } + public abstract double next(Collection values); /** * Write the model to the output stream @@ -124,13 +70,18 @@ protected double[] emptyPredictions(int numPredictions) { * Clone the model, returning an exact copy */ @Override - public abstract MovAvgModel clone(); + public abstract MovModel clone(); - @Override - public abstract int hashCode(); + public int hashCode() { + return 0; + } - @Override - public abstract boolean equals(Object obj); + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + return getClass() == obj.getClass(); + } /** * Abstract class which also provides some concrete parsing functionality. @@ -141,10 +92,10 @@ public abstract static class AbstractModelParser { * * @param settings Map of settings, extracted from the request * @param pipelineName Name of the parent pipeline agg - * @param windowSize Size of the window for this moving avg + * @param windowSize Size of the window for this moving model * @return A fully built moving average model */ - public abstract MovAvgModel parse(@Nullable Map settings, String pipelineName, + public abstract MovModel parse(@Nullable Map settings, String pipelineName, int windowSize) throws ParseException; @@ -172,11 +123,11 @@ protected double parseDoubleParam(@Nullable Map settings, String } throw new ParseException("Parameter [" + name + "] must be between 0-1 inclusive. Provided" - + "value was [" + v + "]", 0); + + "value was [" + v + "]", 0); } throw new ParseException("Parameter [" + name + "] must be a double, type `" - + value.getClass().getSimpleName() + "` provided instead", 0); + + value.getClass().getSimpleName() + "` provided instead", 0); } /** @@ -201,7 +152,7 @@ protected int parseIntegerParam(@Nullable Map settings, String n } throw new ParseException("Parameter [" + name + "] must be an integer, type `" - + value.getClass().getSimpleName() + "` provided instead", 0); + + value.getClass().getSimpleName() + "` provided instead", 0); } /** @@ -226,7 +177,7 @@ protected boolean parseBoolParam(@Nullable Map settings, String } throw new ParseException("Parameter [" + name + "] must be a boolean, type `" - + value.getClass().getSimpleName() + "` provided instead", 0); + + value.getClass().getSimpleName() + "` provided instead", 0); } protected void checkUnrecognizedParams(@Nullable Map settings) throws ParseException { @@ -235,9 +186,4 @@ protected void checkUnrecognizedParams(@Nullable Map settings) t } } } - } - - - - diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModelBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovModelBuilder.java similarity index 87% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModelBuilder.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovModelBuilder.java index 5d858b48d16eb..5565db2ad2374 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModelBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/MovModelBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg.models; +package org.elasticsearch.search.aggregations.pipeline.moving.models; import org.elasticsearch.common.xcontent.ToXContent; @@ -25,7 +25,7 @@ * Represents the common interface that all moving average models share. Moving * average models are used by the MovAvg aggregation */ -public interface MovAvgModelBuilder extends ToXContent { +public interface MovModelBuilder extends ToXContent { - MovAvgModel build(); + MovModel build(); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/SimpleModel.java similarity index 83% rename from core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java rename to core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/SimpleModel.java index e30a59d288711..76136030038d6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/SimpleModel.java @@ -17,13 +17,12 @@ * under the License. */ -package org.elasticsearch.search.aggregations.pipeline.movavg.models; +package org.elasticsearch.search.aggregations.pipeline.moving.models; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; import java.io.IOException; import java.text.ParseException; @@ -72,7 +71,7 @@ public MovAvgModel clone() { } @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; // Simple just emits the same final prediction repeatedly. @@ -82,17 +81,17 @@ protected double[] doPredict(Collection values, int numPre } @Override - public double next(Collection values) { + public double next(Collection values) { double avg = 0; - for (T v : values) { - avg += v.doubleValue(); + for (Double v : values) { + avg += v; } return avg / values.size(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); + builder.field(MovModel.MODEL.getPreferredName(), NAME); return builder; } @@ -104,10 +103,10 @@ public MovAvgModel parse(@Nullable Map settings, String pipeline } }; - public static class SimpleModelBuilder implements MovAvgModelBuilder { + public static class SimpleModelBuilder implements MovModelBuilder { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(MovAvgPipelineAggregationBuilder.MODEL.getPreferredName(), NAME); + builder.field(MovModel.MODEL.getPreferredName(), NAME); return builder; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/SumModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/SumModel.java new file mode 100644 index 0000000000000..5f69e041dc772 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/moving/models/SumModel.java @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.models; + + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.text.ParseException; +import java.util.Collection; +import java.util.Map; + +public class SumModel extends MovModel { + public static final String NAME = "sum"; + + public SumModel() { + } + + /** + * Read from a stream. + */ + public SumModel(StreamInput in) throws IOException { + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + // Nothing to write + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public MovModel clone() { + return new MinModel(); + } + + @Override + public double next(Collection values) { + return values.stream().mapToDouble(Double::doubleValue).sum(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + public static final AbstractModelParser PARSER = new AbstractModelParser() { + @Override + public MovModel parse(@Nullable Map settings, String pipelineName, int windowSize) throws ParseException { + checkUnrecognizedParams(settings); + return new MinModel(); + } + }; + + public static class MinModelBuilder implements MovModelBuilder { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovModel.FUNCTION.getPreferredName(), NAME); + return builder; + } + + @Override + public MovModel build() { + return new MinModel(); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java b/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java index 85b13974042e0..5f5d7004365af 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchModuleTests.java @@ -44,8 +44,8 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java index 5561b61f49288..fecb6cd93dc70 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java @@ -33,7 +33,7 @@ import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.derivative.Derivative; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.test.ESIntegTestCase; import org.hamcrest.Matchers; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java index bbe6ecc3a4e68..115abc30f2fec 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java @@ -33,18 +33,17 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregationHelperTests; import org.elasticsearch.search.aggregations.pipeline.SimpleValue; import org.elasticsearch.search.aggregations.pipeline.derivative.Derivative; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.EwmaModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltLinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltWintersModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.LinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModelBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.test.ESIntegTestCase; import org.hamcrest.Matchers; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -61,6 +60,7 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.range; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.derivative; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.movingAvg; +import static org.elasticsearch.test.hamcrest.DoubleMatcher.nearlyEqual; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -1183,13 +1183,13 @@ public void testCheckIfNonTunableCanBeMinimized() { * These models are all minimizable, so they should not throw exceptions */ public void testCheckIfTunableCanBeMinimized() { - MovAvgModelBuilder[] builders = new MovAvgModelBuilder[]{ + MovModelBuilder[] builders = new MovModelBuilder[]{ new EwmaModel.EWMAModelBuilder(), new HoltLinearModel.HoltLinearModelBuilder(), new HoltWintersModel.HoltWintersModelBuilder() }; - for (MovAvgModelBuilder builder : builders) { + for (MovModelBuilder builder : builders) { try { client() .prepareSearch("idx").setTypes("type") @@ -1309,32 +1309,11 @@ private void assertBucketContents(Histogram.Bucket actual, Double expectedCount, } } - /** - * Better floating point comparisons courtesy of https://github.com/brazzy/floating-point-gui.de - * - * Snippet adapted to use doubles instead of floats - */ - private static boolean nearlyEqual(double a, double b, double epsilon) { - final double absA = Math.abs(a); - final double absB = Math.abs(b); - final double diff = Math.abs(a - b); - - if (a == b) { // shortcut, handles infinities - return true; - } else if (a == 0 || b == 0 || diff < Double.MIN_NORMAL) { - // a or b is zero or both are extremely close to it - // relative error is less meaningful here - return diff < (epsilon * Double.MIN_NORMAL); - } else { // use relative error - return diff / Math.min((absA + absB), Double.MAX_VALUE) < epsilon; - } - } - - private MovAvgModelBuilder randomModelBuilder() { + private MovModelBuilder randomModelBuilder() { return randomModelBuilder(0); } - private MovAvgModelBuilder randomModelBuilder(double padding) { + private MovModelBuilder randomModelBuilder(double padding) { int rand = randomIntBetween(0,3); // HoltWinters is excluded from random generation, because it's "cold start" behavior makes diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java index 869a7cd58ed8e..499c603e99045 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java @@ -23,13 +23,13 @@ import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel.SeasonalityType; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.models.EwmaModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltLinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltWintersModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltWintersModel.SeasonalityType; +import org.elasticsearch.search.aggregations.pipeline.moving.models.LinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel; public class MovAvgTests extends BasePipelineAggregationTestCase { diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java index 34d203443604a..672c50573d442 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java @@ -20,12 +20,12 @@ package org.elasticsearch.search.aggregations.pipeline.moving.avg; import org.elasticsearch.common.collect.EvictingQueue; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; -import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.EwmaModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltLinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltWintersModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.LinearModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel; import org.elasticsearch.test.ESTestCase; import java.text.ParseException; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnIT.java new file mode 100644 index 0000000000000..0134964266dc4 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnIT.java @@ -0,0 +1,454 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.function; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.collect.EvictingQueue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.MockScriptPlugin; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregationHelperTests; +import org.elasticsearch.search.aggregations.pipeline.SimpleValue; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MaxModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MedianModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MinModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SumModel; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.test.ESIntegTestCase; +import org.hamcrest.Matchers; +import org.hamcrest.core.IsNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.avg; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.max; +import static org.elasticsearch.search.aggregations.AggregationBuilders.min; +import static org.elasticsearch.test.hamcrest.DoubleMatcher.nearlyEqual; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.movingFunction; + +@ESIntegTestCase.SuiteScopeTestCase +public class MovFnIT extends ESIntegTestCase { + + private static final String INTERVAL_FIELD = "l_value"; + private static final String VALUE_FIELD = "v_value"; + + static int interval; + static int numBuckets; + static int windowSize; + static BucketHelpers.GapPolicy gapPolicy; + static List mockHisto; + static ValuesSourceAggregationBuilder> metric; + static Map> testValues; + + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(CustomScriptPlugin.class); + } + + public static class CustomScriptPlugin extends MockScriptPlugin { + + @Override + protected Map, Object>> pluginScripts() { + Map, Object>> scripts = new HashMap<>(); + + scripts.put("return 1;", vars -> 1); + return scripts; + } + } + + enum MovFnType { + MIN("min"), MAX("max"), MEDIAN("median"), SUM("sum"), SCRIPT("script"); + + private final String name; + + MovFnType(String s) { + name = s; + } + + @Override + public String toString() { + return name; + } + } + + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + List builders = new ArrayList<>(); + + interval = 5; + numBuckets = randomIntBetween(6, 10); + windowSize = randomIntBetween(2, 4); + metric = randomMetric("the_metric", VALUE_FIELD); + gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS; + mockHisto = PipelineAggregationHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble()); + + testValues = new HashMap<>(8); + + for (MovFnType type : MovFnType.values()) { + setupExpected(type, windowSize); + } + + for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) { + for (double value : mockBucket.docValues) { + builders.add(client().prepareIndex("idx", "type").setSource(jsonBuilder().startObject() + .field(INTERVAL_FIELD, mockBucket.key) + .field(VALUE_FIELD, value).endObject())); + } + } + + indexRandom(true, builders); + ensureSearchable(); + } + + /** + * Calculates the moving fn for a specific model based on the previously generated mock histogram. + * Computed values are stored in the testValues map. + * + * @param type The moving average model to use + */ + private void setupExpected(MovFnType type, int windowSize) { + ArrayList values = new ArrayList<>(numBuckets); + EvictingQueue window = new EvictingQueue<>(windowSize); + + for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) { + double metricValue; + double[] docValues = mockBucket.docValues; + + if (mockBucket.count == 0) { + // If there was a gap in doc counts and we are ignoring, just skip this bucket + if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) { + values.add(null); + continue; + } else { + // otherwise insert a zero instead of the true value + metricValue = 0.0; + } + + } else { + // If this isn't a gap, just insert the value + metricValue = PipelineAggregationHelperTests.calculateMetric(docValues, metric); + } + + if (window.size() > 0) { + switch (type) { + case MIN: + values.add(window.stream().mapToDouble(v -> v).min().orElse(Double.NaN)); + break; + case MAX: + values.add(window.stream().mapToDouble(v -> v).max().orElse(Double.NaN)); + break; + case SUM: + values.add(window.stream().mapToDouble(v -> v).sum()); + break; + case MEDIAN: + values.add(median(window)); + break; + case SCRIPT: + values.add(1.0); + break; + } + } else { + values.add(null); + } + + window.offer(metricValue); + + } + testValues.put(type.name(), values); + } + + private double median(Collection window) { + List listValues = new ArrayList<>(window); + Collections.sort(listValues); + if (window.size() % 2 == 0) { + int middle = (int) Math.floor(window.size() / 2.0); + return (listValues.get(middle - 1) + listValues.get(middle)) / 2; + } + + return listValues.get(listValues.size() / 2); + } + + public void testMin() { + SearchResponse response = client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingFunction("movfn_values", "the_metric", null) + .window(windowSize) + .modelBuilder(new MinModel.MinModelBuilder()) + .gapPolicy(gapPolicy)) + ).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); + + List expectedValues = testValues.get(MovFnType.MIN.name()); + + Iterator actualIter = buckets.iterator(); + Iterator expectedBucketIter = mockHisto.iterator(); + Iterator expectedValuesIter = expectedValues.iterator(); + + while (actualIter.hasNext()) { + assertValidIterators(expectedBucketIter, expectedValuesIter); + + Histogram.Bucket actual = actualIter.next(); + PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next(); + Double expectedValue = expectedValuesIter.next(); + + assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key)); + assertThat("doc counts do not match", actual.getDocCount(), equalTo((long) expected.count)); + + assertBucketContents(actual, expectedValue); + } + } + + public void testMax() { + SearchResponse response = client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingFunction("movfn_values", "the_metric", null) + .window(windowSize) + .modelBuilder(new MaxModel.MaxModelBuilder()) + .gapPolicy(gapPolicy)) + ).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); + + List expectedValues = testValues.get(MovFnType.MAX.name()); + + Iterator actualIter = buckets.iterator(); + Iterator expectedBucketIter = mockHisto.iterator(); + Iterator expectedValuesIter = expectedValues.iterator(); + + while (actualIter.hasNext()) { + assertValidIterators(expectedBucketIter, expectedValuesIter); + + Histogram.Bucket actual = actualIter.next(); + PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next(); + Double expectedValue = expectedValuesIter.next(); + + assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key)); + assertThat("doc counts do not match", actual.getDocCount(), equalTo((long) expected.count)); + + assertBucketContents(actual, expectedValue); + } + } + + public void testMedian() { + SearchResponse response = client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingFunction("movfn_values", "the_metric", null) + .window(windowSize) + .modelBuilder(new MedianModel.MedianModelBuilder()) + .gapPolicy(gapPolicy)) + ).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); + + List expectedValues = testValues.get(MovFnType.MEDIAN.name()); + + Iterator actualIter = buckets.iterator(); + Iterator expectedBucketIter = mockHisto.iterator(); + Iterator expectedValuesIter = expectedValues.iterator(); + + while (actualIter.hasNext()) { + assertValidIterators(expectedBucketIter, expectedValuesIter); + + Histogram.Bucket actual = actualIter.next(); + PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next(); + Double expectedValue = expectedValuesIter.next(); + + assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key)); + assertThat("doc counts do not match", actual.getDocCount(), equalTo((long) expected.count)); + + assertBucketContents(actual, expectedValue); + } + } + + public void testSum() { + SearchResponse response = client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingFunction("movfn_values", "the_metric", null) + .window(windowSize) + .modelBuilder(new SumModel.MinModelBuilder()) + .gapPolicy(gapPolicy)) + ).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); + + List expectedValues = testValues.get(MovFnType.SUM.name()); + + Iterator actualIter = buckets.iterator(); + Iterator expectedBucketIter = mockHisto.iterator(); + Iterator expectedValuesIter = expectedValues.iterator(); + + while (actualIter.hasNext()) { + assertValidIterators(expectedBucketIter, expectedValuesIter); + + Histogram.Bucket actual = actualIter.next(); + PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next(); + Double expectedValue = expectedValuesIter.next(); + + assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key)); + assertThat("doc counts do not match", actual.getDocCount(), equalTo((long) expected.count)); + + assertBucketContents(actual, expectedValue); + } + } + + public void testScript() { + Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "return 1;", Collections.emptyMap()); + SearchResponse response = client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingFunction("movfn_values", "the_metric", null) + .window(windowSize) + .setScript(script) + .gapPolicy(gapPolicy)) + ).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); + + List expectedValues = testValues.get(MovFnType.SCRIPT.name()); + + Iterator actualIter = buckets.iterator(); + Iterator expectedBucketIter = mockHisto.iterator(); + Iterator expectedValuesIter = expectedValues.iterator(); + + while (actualIter.hasNext()) { + assertValidIterators(expectedBucketIter, expectedValuesIter); + + Histogram.Bucket actual = actualIter.next(); + PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next(); + Double expectedValue = expectedValuesIter.next(); + + assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key)); + assertThat("doc counts do not match", actual.getDocCount(), equalTo((long) expected.count)); + + assertBucketContents(actual, expectedValue); + } + } + + private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedValuesIter) { + if (!expectedBucketIter.hasNext()) { + fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch"); + } + if (!expectedValuesIter.hasNext()) { + fail("`expectedValuesIter` iterator ended before `actual` iterator, size mismatch"); + } + } + + private void assertBucketContents(Histogram.Bucket actual, Double expectedValue) { + + SimpleValue values = actual.getAggregations().get("movfn_values"); + if (expectedValue == null) { + assertThat("[value] moving is not null", values, Matchers.nullValue()); + } else if (Double.isNaN(expectedValue)) { + assertThat("[value] moving should be NaN, but is [" + values.value() + "] instead", + values.value(), Matchers.equalTo(Double.NaN)); + } else { + assertThat("[value] moving is null", values, IsNull.notNullValue()); + assertTrue("[value] moving does not match expected [" + values.value() + " vs " + expectedValue + "]", + nearlyEqual(values.value(), expectedValue, 0.1)); + } + } + + private static ValuesSourceAggregationBuilder> randomMetric(String name, String field) { + int rand = randomIntBetween(0, 3); + + switch (rand) { + case 0: + return min(name).field(field); + case 2: + return max(name).field(field); + case 3: + return avg(name).field(field); + default: + return avg(name).field(field); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnTests.java new file mode 100644 index 0000000000000..ecda4adc66e2d --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnTests.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.function; + +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers; +import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MaxModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MedianModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MinModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel; + +import java.util.HashMap; +import java.util.Map; + +public class MovFnTests extends BasePipelineAggregationTestCase { + + @Override + protected MovFunctionPipelineAggregationBuilder createTestAggregatorFactory() { + String name = randomAlphaOfLengthBetween(3, 20); + String bucketsPath = randomAlphaOfLengthBetween(3, 20); + Script script; + MovModel model; + + if (randomBoolean()) { + model = null; + if (randomBoolean()) { + script = mockScript("script"); + } else { + Map params = new HashMap<>(); + if (randomBoolean()) { + params.put("foo", "bar"); + } + ScriptType type = randomFrom(ScriptType.values()); + script = new Script(type, type == ScriptType.STORED + ? null + : randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG), + "script", params); + } + } else { + script = null; + switch (randomInt(3)) { + case 0: + model = new MinModel(); + break; + case 1: + model = new MaxModel(); + break; + case 2: + model = new MedianModel(); + break; + default: + model = new MinModel(); + break; + } + } + MovFunctionPipelineAggregationBuilder factory + = new MovFunctionPipelineAggregationBuilder(name, bucketsPath, script); + if (model != null) { + factory.function(model); + } + if (randomBoolean()) { + factory.window(randomIntBetween(1, 100)); + } + if (randomBoolean()) { + factory.format(randomAlphaOfLengthBetween(1, 10)); + } + if (randomBoolean()) { + factory.gapPolicy(randomFrom(BucketHelpers.GapPolicy.values())); + } + return factory; + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnUnitTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnUnitTests.java new file mode 100644 index 0000000000000..a5d58b4910fd0 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/function/MovFnUnitTests.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.moving.function; + +import org.elasticsearch.common.collect.EvictingQueue; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MaxModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MedianModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MinModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel; +import org.elasticsearch.search.aggregations.pipeline.moving.models.SumModel; +import org.elasticsearch.test.ESTestCase; + +import static org.elasticsearch.test.hamcrest.DoubleMatcher.nearlyEqual; + +public class MovFnUnitTests extends ESTestCase { + public void testMinFn() { + MovModel model = new MinModel(); + + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double expected = Double.MAX_VALUE; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + expected = Math.min(expected, value); + } + + double actual = model.next(window); + assertTrue("actual does not match expected [" + actual + " vs " + expected + "]", + nearlyEqual(actual, expected, 0.1)); + window.offer(randValue); + } + } + + public void testMaxFn() { + MovModel model = new MaxModel(); + + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double expected = -Double.MAX_VALUE; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + expected = Math.max(expected, value); + } + + double actual = model.next(window); + assertTrue("actual does not match expected [" + actual + " vs " + expected + "]", + nearlyEqual(actual, expected, 0.1)); + window.offer(randValue); + } + } + + public void testMedianOddFn() { + MovModel model = new MedianModel(); + + int windowSize = 11; + + double[] values = {1,10,3,9,5,6,7,8,4,2,11}; + EvictingQueue window = new EvictingQueue<>(windowSize); + for (double v : values) { + window.offer(v); + } + + double actual = model.next(window); + + assertEquals(6, actual, 0.0001); + } + + public void testMedianEvenFn() { + MovModel model = new MedianModel(); + + int windowSize = 11; + + double[] values = {1,10,3,9,5,6,7,8,4,2}; + EvictingQueue window = new EvictingQueue<>(windowSize); + for (double v : values) { + window.offer(v); + } + + double actual = model.next(window); + + assertEquals(5.5, actual, 0.0001); + } + + public void testSumFn() { + MovModel model = new SumModel(); + + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double expected = 0; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + expected += value; + } + + double actual = model.next(window); + assertTrue("actual does not match expected [" + actual + " vs " + expected + "]", + nearlyEqual(actual, expected, 0.1)); + window.offer(randValue); + } + } +} diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index 0f3573eaa9ad2..91d13508260e3 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -233,6 +233,7 @@ include::pipeline/stats-bucket-aggregation.asciidoc[] include::pipeline/extended-stats-bucket-aggregation.asciidoc[] include::pipeline/percentiles-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] +include::pipeline/movfn-aggregation.asciidoc[] include::pipeline/cumulative-sum-aggregation.asciidoc[] include::pipeline/bucket-script-aggregation.asciidoc[] include::pipeline/bucket-selector-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc new file mode 100644 index 0000000000000..c3cbb91d9f215 --- /dev/null +++ b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc @@ -0,0 +1,322 @@ +[[search-aggregations-pipeline-movavg-aggregation]] +=== Moving Function Aggregation + +experimental[] + +Given an ordered series of data, the Moving Function aggregation will slide a window across the data and emit the +user-defined function calculated for that window. There are several pre-built functions available, as well as the +ability to use a script. The pre-built functions are: + +- Min +- Max +- Median +- Sum + +For example, selecting the `min` model will find the minimum value in each window. + + +==== Syntax + +A `moving_fn` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "moving_fn": { + "buckets_path": "the_sum", + "window": 5, + "gap_policy": "skip", + "function": "min", <1> + "script": {...} <1> + } +} +-------------------------------------------------- +// NOTCONSOLE +<1> `model` and `script` are mutually exclusive, but shown here for posterity + +.`moving_avg` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |Path to the metric of interest (see <> for more details |Required | +|`function` |The function that we wish to use on each window |Required if not using `script` | +|`gap_policy` |Determines what should happen when a gap in the data is encountered. |Optional |`insert_zeros` +|`window` |The size of window to "slide" across the histogram. |Optional |`5` +|`script` |User-defined script to execute on each window. Must return a number |Required if not using `function` | +|=== + +`moving_fn` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be +embedded like any other metric aggregation: + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ <1> + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } <2> + }, + "the_movfn":{ + "moving_fn":{ <3> + "buckets_path": "the_sum", + "function": "max" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals +<2> A `sum` metric is used to calculate the sum of a field. This could be any metric (sum, min, max, etc) +<3> Finally, we specify a `moving_fn` aggregation which uses "the_sum" metric as its input. + +Moving functions are built by first specifying a `histogram` or `date_histogram` over a field. You can then optionally +add normal metrics, such as a `sum`, inside of that histogram. Finally, the `moving_fn` is embedded inside the histogram. +The `buckets_path` parameter is then used to "point" at one of the sibling metrics inside of the histogram (see +<> for a description of the syntax for `buckets_path`. + +An example response from the above aggregation may look like: + +[source,js] +-------------------------------------------------- +{ + "took": 11, + "timed_out": false, + "_shards": ..., + "hits": ..., + "aggregations": { + "my_date_histo": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "the_sum": { + "value": 550.0 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "the_sum": { + "value": 60.0 + }, + "the_movfn": { + "value": 550.0 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "the_sum": { + "value": 375.0 + }, + "the_movfn": { + "value": 550.0 + } + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 11/"took": $body.took/] +// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] +// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] + + +==== Functions + +The `moving_fn` aggregation includes four different moving pre-built functions, which are specified +using the `function` parameter. + +===== Min + +Finds the minimum value in the window of values: + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movfn":{ + "moving_fn":{ + "buckets_path": "the_sum", + "window" : 30, + "function" : "min" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +===== Max + +Finds the maximum value in the window of values: + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movfn":{ + "moving_fn":{ + "buckets_path": "the_sum", + "window" : 30, + "function" : "max" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +===== Sum + +Finds the sum of all the value in the window: + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movfn":{ + "moving_fn":{ + "buckets_path": "the_sum", + "window" : 30, + "function" : "sum" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +===== Median + +Finds the median of the values in the window, e.g. the point that separates the top from the bottom half of the data: + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movfn":{ + "moving_fn":{ + "buckets_path": "the_sum", + "window" : 30, + "function" : "median" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +===== Script + +A user-defined script can also be used, in place of the pre-built functions. The script is executed +once for each window of values, and must return a numeric value. The values for the window will be provided +in the `params.values` parameter. + +*Note:* scripts are mutually exclusive with the `function` parameter; if you specify a script, you may not +specify a function. + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movfn":{ + "moving_fn":{ + "buckets_path": "the_sum", + "window" : 30, + "script" : { + "lang": "painless", + "inline": "def sum = 0; for (Double v : params.values) {sum += v;} return sum;" <1> + } + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] +<1> Calculates the sum of the window using a script instead of the `sum` function \ No newline at end of file