From 08c5ed58987b63e20cdc60371d9c33549e62dbfb Mon Sep 17 00:00:00 2001 From: Nikita_Glashchenko Date: Mon, 15 Jul 2019 21:12:01 +0400 Subject: [PATCH 1/7] Introduce `shift` field in `MovFnPipelineAggregator` --- .../MovFnPipelineAggregationBuilder.java | 9 +++- .../pipeline/MovFnPipelineAggregator.java | 36 ++++++++++++-- .../aggregations/pipeline/MovFnUnitTests.java | 47 ++++++++++++------- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java index 44f26c3c32bf8..2cc4b0817c988 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java @@ -54,6 +54,7 @@ public class MovFnPipelineAggregationBuilder extends AbstractPipelineAggregation private String format = null; private GapPolicy gapPolicy = GapPolicy.SKIP; private int window; + private int shift; private static final Function> PARSER = name -> { @@ -168,9 +169,13 @@ public void setWindow(int window) { this.window = window; } + public void setShift(int shift) { + this.shift = shift; + } + @Override public void doValidate(AggregatorFactory parent, Collection aggFactories, - Collection pipelineAggregatoractories) { + Collection pipelineAggregatorFactories) { if (window <= 0) { throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer."); } @@ -180,7 +185,7 @@ public void doValidate(AggregatorFactory parent, Collection @Override protected PipelineAggregator createInternal(Map metaData) { - return new MovFnPipelineAggregator(name, bucketsPathString, script, window, formatter(), gapPolicy, metaData); + return new MovFnPipelineAggregator(name, bucketsPathString, script, window, shift, formatter(), gapPolicy, metaData); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 4f14df2d66d12..797e5a97c13be 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -63,8 +63,9 @@ public class MovFnPipelineAggregator extends PipelineAggregator { private final Script script; private final String bucketsPath; private final int window; + private final int shift; - MovFnPipelineAggregator(String name, String bucketsPath, Script script, int window, DocValueFormat formatter, + MovFnPipelineAggregator(String name, String bucketsPath, Script script, int window, int shift, DocValueFormat formatter, BucketHelpers.GapPolicy gapPolicy, Map metadata) { super(name, new String[]{bucketsPath}, metadata); this.bucketsPath = bucketsPath; @@ -72,6 +73,7 @@ public class MovFnPipelineAggregator extends PipelineAggregator { this.formatter = formatter; this.gapPolicy = gapPolicy; this.window = window; + this.shift = shift; } public MovFnPipelineAggregator(StreamInput in) throws IOException { @@ -81,6 +83,7 @@ public MovFnPipelineAggregator(StreamInput in) throws IOException { gapPolicy = BucketHelpers.GapPolicy.readFrom(in); bucketsPath = in.readString(); window = in.readInt(); + shift = 0; // todo } @Override @@ -90,6 +93,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { gapPolicy.writeTo(out); out.writeString(bucketsPath); out.writeInt(window); + // todo } @Override @@ -106,7 +110,6 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre HistogramFactory factory = (HistogramFactory) histo; List newBuckets = new ArrayList<>(); - EvictingQueue values = new EvictingQueue<>(this.window); // Initialize the script MovingFunctionScript.Factory scriptFactory = reduceContext.scriptService().compile(script, MovingFunctionScript.CONTEXT); @@ -117,6 +120,12 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre MovingFunctionScript executableScript = scriptFactory.newInstance(); + List values = buckets.stream() + .map(b -> resolveBucketValue(histo, b, bucketsPaths()[0], gapPolicy)) + .filter(v -> v != null && !v.isNaN()) + .collect(Collectors.toList()); + + int index = 0; for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) { Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); @@ -124,11 +133,18 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre // since we only change newBucket if we can add to it MultiBucketsAggregation.Bucket newBucket = bucket; - if (thisBucketValue != null && thisBucketValue.equals(Double.NaN) == false) { + if (thisBucketValue != null && !thisBucketValue.isNaN()) { // The custom context mandates that the script returns a double (not Double) so we // don't need null checks, etc. - double movavg = executableScript.execute(vars, values.stream().mapToDouble(Double::doubleValue).toArray()); + int fromIndex = clamp(index - window + shift, values); + int toIndex = clamp(index + shift, values); + double movavg = executableScript.execute( + vars, + values.subList(fromIndex, toIndex).stream() + .mapToDouble(Double::doubleValue) + .toArray() + ); List aggs = StreamSupport .stream(bucket.getAggregations().spliterator(), false) @@ -136,11 +152,21 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre .collect(Collectors.toList()); aggs.add(new InternalSimpleValue(name(), movavg, formatter, new ArrayList<>(), metaData())); newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs)); - values.offer(thisBucketValue); + index++; } newBuckets.add(newBucket); } return factory.createAggregation(newBuckets); } + + private int clamp(int index, List list) { + if (index < 0) { + return 0; + } + if (index > list.size()) { + return list.size(); + } + return index; + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java index 27490fa202bda..719f69bce905e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java @@ -46,13 +46,9 @@ import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; @@ -79,25 +75,42 @@ public class MovFnUnitTests extends AggregatorTestCase { private static final List datasetValues = Arrays.asList(1,2,3,4,5,6,7,8,9,10); public void testMatchAllDocs() throws IOException { - Query query = new MatchAllDocsQuery(); + test(0, List.of(Double.NaN, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0)); + } + + public void testShift() throws IOException { + test(1, List.of(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0)); + test(5, List.of(5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 10.0, 10.0, Double.NaN, Double.NaN)); + test(-5, List.of(Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, 1.0, 2.0, 3.0, 4.0)); + } + + public void testWideWindow() throws IOException { Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap()); + MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 100); + builder.setShift(50); + test(builder, script, List.of(10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0)); + } + private void test(int shift, List expected) throws IOException { + Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap()); + MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 3); + builder.setShift(shift); + test(builder, script, expected); + } + + private void test(MovFnPipelineAggregationBuilder builder, Script script, List expected) throws IOException { + Query query = new MatchAllDocsQuery(); DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD); aggBuilder.subAggregation(new AvgAggregationBuilder("avg").field(VALUE_FIELD)); - aggBuilder.subAggregation(new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 3)); + aggBuilder.subAggregation(builder); executeTestCase(query, aggBuilder, histogram -> { - assertEquals(10, histogram.getBuckets().size()); List buckets = histogram.getBuckets(); - for (int i = 0; i < buckets.size(); i++) { - if (i == 0) { - assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(Double.NaN)); - } else { - assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(((double) i))); - } - - } + List actual = buckets.stream() + .map(bucket -> ((InternalSimpleValue) (bucket.getAggregations().get("mov_fn"))).value()) + .collect(Collectors.toList()); + assertThat(actual, equalTo(expected)); }, 1000, script); } From 7d39dc4c6d9372f361c19cc23421d30fa2583609 Mon Sep 17 00:00:00 2001 From: Nikita_Glashchenko Date: Tue, 16 Jul 2019 15:23:14 +0400 Subject: [PATCH 2/7] Fix imports --- .../aggregations/pipeline/MovFnPipelineAggregator.java | 1 - .../search/aggregations/pipeline/MovFnUnitTests.java | 7 ++++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 797e5a97c13be..84771ef73ad4c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -19,7 +19,6 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.script.Script; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java index 719f69bce905e..df566611b2d93 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java @@ -46,7 +46,12 @@ import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; From a38b1f97cde56034f5bea3d90b14058af489276a Mon Sep 17 00:00:00 2001 From: Nikita_Glashchenko Date: Tue, 16 Jul 2019 18:39:38 +0400 Subject: [PATCH 3/7] Parse `shift` field --- .../pipeline/MovFnPipelineAggregationBuilder.java | 10 ++++++++-- .../aggregations/pipeline/MovFnPipelineAggregator.java | 4 ++-- ...FnPipelineAggregationBuilderSerializationTests.java | 10 ++++++++-- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java index 2cc4b0817c988..d01d413b136d1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java @@ -48,6 +48,7 @@ public class MovFnPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { public static final String NAME = "moving_fn"; private static final ParseField WINDOW = new ParseField("window"); + private static final ParseField SHIFT = new ParseField("shift"); private final Script script; private final String bucketsPathString; @@ -69,6 +70,7 @@ public class MovFnPipelineAggregationBuilder extends AbstractPipelineAggregation (p, c) -> Script.parse(p), Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING); parser.declareInt(ConstructingObjectParser.constructorArg(), WINDOW); + parser.declareInt(MovFnPipelineAggregationBuilder::setShift, SHIFT); parser.declareString(MovFnPipelineAggregationBuilder::format, FORMAT); parser.declareField(MovFnPipelineAggregationBuilder::gapPolicy, p -> { if (p.currentToken() == XContentParser.Token.VALUE_STRING) { @@ -98,6 +100,7 @@ public MovFnPipelineAggregationBuilder(StreamInput in) throws IOException { format = in.readOptionalString(); gapPolicy = GapPolicy.readFrom(in); window = in.readInt(); + shift = in.readInt(); } @Override @@ -107,6 +110,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeOptionalString(format); gapPolicy.writeTo(out); out.writeInt(window); + out.writeInt(shift); } /** @@ -197,6 +201,7 @@ protected XContentBuilder internalXContent(XContentBuilder builder, Params param } builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName()); builder.field(WINDOW.getPreferredName(), window); + builder.field(SHIFT.getPreferredName(), shift); return builder; } @@ -230,7 +235,7 @@ protected boolean overrideBucketsPath() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), bucketsPathString, script, format, gapPolicy, window); + return Objects.hash(super.hashCode(), bucketsPathString, script, format, gapPolicy, window, shift); } @Override @@ -243,7 +248,8 @@ public boolean equals(Object obj) { && Objects.equals(script, other.script) && Objects.equals(format, other.format) && Objects.equals(gapPolicy, other.gapPolicy) - && Objects.equals(window, other.window); + && Objects.equals(window, other.window) + && Objects.equals(shift, other.shift); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 84771ef73ad4c..63dcecaca2a27 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -82,7 +82,7 @@ public MovFnPipelineAggregator(StreamInput in) throws IOException { gapPolicy = BucketHelpers.GapPolicy.readFrom(in); bucketsPath = in.readString(); window = in.readInt(); - shift = 0; // todo + shift = in.readInt(); } @Override @@ -92,7 +92,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { gapPolicy.writeTo(out); out.writeString(bucketsPath); out.writeInt(window); - // todo + out.writeInt(shift); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilderSerializationTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilderSerializationTests.java index 49923640805bd..cb1e2d5249b4a 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilderSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilderSerializationTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.script.Script; -import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; @@ -31,7 +30,14 @@ public class MovFnPipelineAggregationBuilderSerializationTests extends AbstractS @Override protected MovFnPipelineAggregationBuilder createTestInstance() { - return new MovFnPipelineAggregationBuilder(randomAlphaOfLength(10), "foo", new Script("foo"), randomIntBetween(1, 10)); + MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder( + randomAlphaOfLength(10), + "foo", + new Script("foo"), + randomIntBetween(1, 10) + ); + builder.setShift(randomIntBetween(1, 10)); + return builder; } @Override From 82fa5dc33a8995523fb6c61f8b7a48b92a5558fe Mon Sep 17 00:00:00 2001 From: Nikita_Glashchenko Date: Tue, 16 Jul 2019 19:19:55 +0400 Subject: [PATCH 4/7] Make serialization cross-version compatible --- .../pipeline/MovFnPipelineAggregationBuilder.java | 11 +++++++++-- .../pipeline/MovFnPipelineAggregator.java | 11 +++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java index d01d413b136d1..5cacdf5bdb6da 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -100,7 +101,11 @@ public MovFnPipelineAggregationBuilder(StreamInput in) throws IOException { format = in.readOptionalString(); gapPolicy = GapPolicy.readFrom(in); window = in.readInt(); - shift = in.readInt(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + shift = in.readInt(); + } else { + shift = 0; + } } @Override @@ -110,7 +115,9 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeOptionalString(format); gapPolicy.writeTo(out); out.writeInt(window); - out.writeInt(shift); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeInt(shift); + } } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 63dcecaca2a27..97aa019a925b9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.script.Script; @@ -82,7 +83,11 @@ public MovFnPipelineAggregator(StreamInput in) throws IOException { gapPolicy = BucketHelpers.GapPolicy.readFrom(in); bucketsPath = in.readString(); window = in.readInt(); - shift = in.readInt(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + shift = in.readInt(); + } else { + shift = 0; + } } @Override @@ -92,7 +97,9 @@ protected void doWriteTo(StreamOutput out) throws IOException { gapPolicy.writeTo(out); out.writeString(bucketsPath); out.writeInt(window); - out.writeInt(shift); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeInt(shift); + } } @Override From 2648fde2edd3677a6e816cbbab87b36aa2e4e913 Mon Sep 17 00:00:00 2001 From: Nikita_Glashchenko Date: Tue, 16 Jul 2019 19:22:35 +0400 Subject: [PATCH 5/7] Fix style --- .../pipeline/MovFnPipelineAggregator.java | 4 ++-- .../aggregations/pipeline/MovFnUnitTests.java | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 97aa019a925b9..9b1c8dbe98b18 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -128,7 +128,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre List values = buckets.stream() .map(b -> resolveBucketValue(histo, b, bucketsPaths()[0], gapPolicy)) - .filter(v -> v != null && !v.isNaN()) + .filter(v -> v != null && v.isNaN() == false) .collect(Collectors.toList()); int index = 0; @@ -139,7 +139,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, InternalAggre // since we only change newBucket if we can add to it MultiBucketsAggregation.Bucket newBucket = bucket; - if (thisBucketValue != null && !thisBucketValue.isNaN()) { + if (thisBucketValue != null && thisBucketValue.isNaN() == false) { // The custom context mandates that the script returns a double (not Double) so we // don't need null checks, etc. diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java index df566611b2d93..862f556455534 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/MovFnUnitTests.java @@ -80,30 +80,30 @@ public class MovFnUnitTests extends AggregatorTestCase { private static final List datasetValues = Arrays.asList(1,2,3,4,5,6,7,8,9,10); public void testMatchAllDocs() throws IOException { - test(0, List.of(Double.NaN, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0)); + check(0, List.of(Double.NaN, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0)); } public void testShift() throws IOException { - test(1, List.of(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0)); - test(5, List.of(5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 10.0, 10.0, Double.NaN, Double.NaN)); - test(-5, List.of(Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, 1.0, 2.0, 3.0, 4.0)); + check(1, List.of(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0)); + check(5, List.of(5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 10.0, 10.0, Double.NaN, Double.NaN)); + check(-5, List.of(Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, Double.NaN, 1.0, 2.0, 3.0, 4.0)); } public void testWideWindow() throws IOException { Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap()); MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 100); builder.setShift(50); - test(builder, script, List.of(10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0)); + check(builder, script, List.of(10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0)); } - private void test(int shift, List expected) throws IOException { + private void check(int shift, List expected) throws IOException { Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap()); MovFnPipelineAggregationBuilder builder = new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 3); builder.setShift(shift); - test(builder, script, expected); + check(builder, script, expected); } - private void test(MovFnPipelineAggregationBuilder builder, Script script, List expected) throws IOException { + private void check(MovFnPipelineAggregationBuilder builder, Script script, List expected) throws IOException { Query query = new MatchAllDocsQuery(); DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD); From 8ee7790be7b70fff52d2f453e02729fe4d5cb1b4 Mon Sep 17 00:00:00 2001 From: Nikita_Glashchenko Date: Tue, 16 Jul 2019 20:35:34 +0400 Subject: [PATCH 6/7] Update docs --- .../pipeline/movfn-aggregation.asciidoc | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc index ea414237174e6..3dd21e57f38ae 100644 --- a/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc @@ -24,14 +24,15 @@ A `moving_fn` aggregation looks like this in isolation: -------------------------------------------------- // NOTCONSOLE -[[moving-avg-params]] -.`moving_avg` Parameters +[[moving-fn-params]] +.`moving_fn` Parameters [options="header"] |=== |Parameter Name |Description |Required |Default Value |`buckets_path` |Path to the metric of interest (see <> for more details |Required | |`window` |The size of window to "slide" across the histogram. |Required | |`script` |The script that should be executed on each window of data |Required | +|`shift` |<> of window position. |Optional | 0 |=== `moving_fn` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be @@ -169,6 +170,16 @@ POST /_search // CONSOLE // TEST[setup:sales] +[[shift-parameter]] +==== shift parameter + +By default (with `shift = 0`), the window that is offered for calculation is the last `n` values excluding the current bucket. +Increasing `shift` by 1 moves starting window position by `1` to the right. + +- To include current bucket to the window, use `shift = 1`. +- For center alignment (`n / 2` values before and after the current bucket), use `shift = window / 2`. +- For right alignment (`n` values after the current bucket), use `shift = window`. + ==== Pre-built Functions For convenience, a number of functions have been prebuilt and are available inside the `moving_fn` script context: From 89254691a8fb0e054ea315d25de493383e2d42ca Mon Sep 17 00:00:00 2001 From: Nikita_Glashchenko Date: Wed, 17 Jul 2019 21:29:12 +0500 Subject: [PATCH 7/7] Fix review comments --- .../aggregations/pipeline/movfn-aggregation.asciidoc | 2 ++ .../pipeline/MovFnPipelineAggregationBuilder.java | 4 ++-- .../search/aggregations/pipeline/MovFnPipelineAggregator.java | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc index 3dd21e57f38ae..cdea58d45ae07 100644 --- a/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc @@ -180,6 +180,8 @@ Increasing `shift` by 1 moves starting window position by `1` to the right. - For center alignment (`n / 2` values before and after the current bucket), use `shift = window / 2`. - For right alignment (`n` values after the current bucket), use `shift = window`. +If either of window edges moves outside the borders of data series, the window shrinks to include available values only. + ==== Pre-built Functions For convenience, a number of functions have been prebuilt and are available inside the `moving_fn` script context: diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java index 5cacdf5bdb6da..7d56197cc46a6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java @@ -101,7 +101,7 @@ public MovFnPipelineAggregationBuilder(StreamInput in) throws IOException { format = in.readOptionalString(); gapPolicy = GapPolicy.readFrom(in); window = in.readInt(); - if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport shift = in.readInt(); } else { shift = 0; @@ -115,7 +115,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeOptionalString(format); gapPolicy.writeTo(out); out.writeInt(window); - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport out.writeInt(shift); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java index 9b1c8dbe98b18..b0915350c2631 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregator.java @@ -83,7 +83,7 @@ public MovFnPipelineAggregator(StreamInput in) throws IOException { gapPolicy = BucketHelpers.GapPolicy.readFrom(in); bucketsPath = in.readString(); window = in.readInt(); - if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport shift = in.readInt(); } else { shift = 0; @@ -97,7 +97,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { gapPolicy.writeTo(out); out.writeString(bucketsPath); out.writeInt(window); - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO change this after backport out.writeInt(shift); } }