Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
}).collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), predictions[i], formatter, new ArrayList<PipelineAggregator>(), metaData()));

Bucket newBucket = factory.createBucket(newKey, 0, new InternalAggregations(aggs));
Bucket newBucket = factory.createBucket(newKey, bucket.getDocCount(), new InternalAggregations(aggs));

// Overwrite the existing bucket with the new version
newBuckets.set(lastValidPosition + i + 1, newBucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.elasticsearch.search.aggregations.pipeline.moving.avg;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.collect.EvictingQueue;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
Expand All @@ -41,6 +44,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -67,6 +71,7 @@
public class MovAvgIT extends ESIntegTestCase {
private static final String INTERVAL_FIELD = "l_value";
private static final String VALUE_FIELD = "v_value";
private static final String VALUE_FIELD2 = "v_value2";

static int interval;
static int numBuckets;
Expand Down Expand Up @@ -1204,6 +1209,68 @@ public void testCheckIfTunableCanBeMinimized() {
}
}

public void testPredictWithNonEmptyBuckets() throws Exception {

createIndex("predict_non_empty");
BulkRequestBuilder bulkBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

for (int i = 0; i < 10; i++) {
bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource(
jsonBuilder().startObject().field(INTERVAL_FIELD, i)
.field(VALUE_FIELD, 10)
.field(VALUE_FIELD2, 10)
.endObject()));
}
for (int i = 10; i < 20; i++) {
// Extra so there is a bucket that only has second field
bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource(
jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD2, 10).endObject()));
}

bulkBuilder.execute().actionGet();
ensureSearchable();

SearchResponse response = client()
.prepareSearch("predict_non_empty")
.setTypes("type")
.addAggregation(
histogram("histo")
.field(INTERVAL_FIELD)
.interval(1)
.subAggregation(max("max").field(VALUE_FIELD))
.subAggregation(max("max2").field(VALUE_FIELD2))
.subAggregation(
movingAvg("movavg_values", "max")
.window(windowSize)
.modelBuilder(new SimpleModel.SimpleModelBuilder())
.gapPolicy(BucketHelpers.GapPolicy.SKIP).predict(5))).execute().actionGet();

assertSearchResponse(response);

Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(20));

SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
assertThat(current, nullValue());

for (int i = 1; i < 20; i++) {
Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat(bucket.getKey(), equalTo((double)i));
assertThat(bucket.getDocCount(), equalTo(1L));
SimpleValue movAvgAgg = bucket.getAggregations().get("movavg_values");
if (i < 15) {
assertThat(movAvgAgg, notNullValue());
assertThat(movAvgAgg.value(), equalTo(10d));
} else {
assertThat(movAvgAgg, nullValue());
}
}
}

private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) {
if (!expectedBucketIter.hasNext()) {
fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch");
Expand Down