From 8aa9fe38a94ca65d72db2e1327f46c197df75415 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 1 Nov 2018 10:56:23 -0500 Subject: [PATCH 1/8] ML: Adding missing data check class --- .../ml/integration/DelayedDataDetectorIT.java | 265 ++++++++++++++++++ .../xpack/ml/datafeed/DatafeedJob.java | 1 + .../xpack/ml/datafeed/DatafeedManager.java | 6 + .../ml/datafeed/DelayedDataDetector.java | 123 ++++++++ 4 files changed, 395 insertions(+) create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java new file mode 100644 index 0000000000000..2fda75d129164 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -0,0 +1,265 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Request; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.test.BackgroundIndexer; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.core.ml.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; +import static org.hamcrest.Matchers.equalTo; + +public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { + + private String index = "delayed-data"; + private long now = System.currentTimeMillis(); + private long numDocs; + + @Before + public void putDataintoIndex() { + client().admin().indices().prepareCreate(index) + .addMapping("type", "time", "type=date", "value", "type=long") + .get(); + numDocs = randomIntBetween(32, 2048); + long oneDayAgo = now - 86400000; + writeData(logger, index, numDocs, oneDayAgo, now); + } + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void testWithoutMissing() throws Exception { + final String jobId = "delayed-data-detection-job-no-missing-test"; + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( + response -> assertThat(response, equalTo(0L)), + e -> fail() + )); + } + + public void testWithMissing() throws Exception { + final String jobId = "delayed-data-detection-job-missing-test"; + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + long missingDocs = randomIntBetween(32, 2048); + writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); + + delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( + response -> assertThat(response, equalTo(missingDocs)), + e -> fail() + )); + } + + public void testWithoutMissingAndAggregationsAndQuery() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(10); + final String jobId = "delayed-data-detection-job-aggs-no-missing-test"; + Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count"); + + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value"); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator( + AggregationBuilders.histogram("time") + .subAggregation(maxTime) + .subAggregation(avgAggregationBuilder) + .field("time") + .interval(TimeValue.timeValueMinutes(5).millis()))); + datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gt(numDocs/2)); + datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( + response -> assertThat(response, equalTo(0L)), + e -> fail() + )); + } + + public void testWithMissingAndAggregationsAndQuery() throws Exception { + TimeValue bucketSpan = TimeValue.timeValueMinutes(10); + final String jobId = "delayed-data-detection-job-aggs-and-missing-test"; + Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count"); + + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value"); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator( + AggregationBuilders.histogram("time") + .subAggregation(maxTime) + .subAggregation(avgAggregationBuilder) + .field("time") + .interval(TimeValue.timeValueMinutes(5).millis()))); + datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); + datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gt(numDocs/2)); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + long missingDocs = numDocs; + writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); + + delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( + response -> assertThat(response, equalTo(missingDocs/2)), + e -> fail() + )); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { + return createJob(id, bucketSpan, function, field, null); + } + + private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder(function, field); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(bucketSpan); + analysisConfig.setSummaryCountFieldName(summaryCountField); + + Job.Builder builder = new Job.Builder(); + builder.setId(id); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder; + } + + private void writeData(Logger logger, String index, long numDocs, long start, long end) { + int maxDelta = (int) (end - start - 1); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + for (int i = 0; i < numDocs; i++) { + IndexRequest indexRequest = new IndexRequest(index, "type"); + long timestamp = start + randomIntBetween(0, maxDelta); + assert timestamp >= start && timestamp < end; + indexRequest.source("time", timestamp, "value", i); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + if (bulkResponse.hasFailures()) { + int failures = 0; + for (BulkItemResponse itemResponse : bulkResponse) { + if (itemResponse.isFailed()) { + failures++; + logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); + } + } + fail("Bulk response contained " + failures + " failures"); + } + logger.info("Indexed [{}] documents", numDocs); + } + + private Bucket getLatestFinalizedBucket(String jobId) { + GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId); + getBucketsRequest.setExcludeInterim(true); + getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName()); + getBucketsRequest.setDescending(true); + getBucketsRequest.setPageParams(new PageParams(0, 1)); + return getBuckets(getBucketsRequest).get(0); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 54a79ee199ee1..5af564b8b9133 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.messages.Messages; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 9f4191b38f2de..190acbdf53531 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -282,6 +282,7 @@ public class Holder { private final ProblemTracker problemTracker; private final Consumer handler; volatile Future future; + volatile Future futureDataCheck; private volatile boolean isRelocating; Holder(TransportStartDatafeedAction.DatafeedTask task, DatafeedConfig datafeed, DatafeedJob datafeedJob, @@ -326,6 +327,7 @@ public void stop(String source, TimeValue timeout, Exception e) { datafeed.getJobId(), acquired); runningDatafeedsOnThisNode.remove(allocationId); FutureUtils.cancel(future); + FutureUtils.cancel(futureDataCheck); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); handler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeed.getId(), datafeed.getJobId(), @@ -381,6 +383,10 @@ private long executeRealTime() throws Exception { } } + private void executeMissingDataCheck() throws Exception { + + } + private void closeJob() { ClusterState clusterState = clusterService.state(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java new file mode 100644 index 0000000000000..acf795a200cf4 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java @@ -0,0 +1,123 @@ +package org.elasticsearch.xpack.ml.datafeed; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.action.util.PageParams; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.Intervals; +import org.joda.time.DateTime; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +/** + * This class will search the buckets and indices over a given window to determine if any data is missing + */ +public class DelayedDataDetector { + + private static final String DATE_BUCKETS = "date_buckets"; + private final long bucketSpan; + private final long window; + private final DatafeedConfig datafeedConfig; + private final Client client; + private final Job job; + + public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue window, Client client) { + this.job = job; + this.bucketSpan = job.getAnalysisConfig().getBucketSpan().millis(); + this.datafeedConfig = datafeedConfig; + long windowMillis = window.millis(); + if (windowMillis < bucketSpan) { + throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]"); + } + this.window = windowMillis; + this.client = client; + } + + public void missingData(long latestFinalizedBucket, ActionListener listener) { + final long end = Intervals.alignToFloor(latestFinalizedBucket, bucketSpan); + final long start = Intervals.alignToFloor(latestFinalizedBucket - window, bucketSpan); + checkBucketEvents(start, end, ActionListener.wrap( + finalizedBuckets -> checkTrueData(start, end, ActionListener.wrap( + indexedData -> listener.onResponse(finalizedBuckets.entrySet() + .stream() + .map((entry) -> indexedData.getOrDefault(entry.getKey(), 0L) - entry.getValue()) + .filter(v -> v > 0) + .collect(Collectors.summingLong(Long::longValue))), + listener::onFailure) + ), + listener::onFailure + )); + } + + private void checkBucketEvents(long start, long end, ActionListener> listener) { + GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); + request.setStart(Long.toString(start)); + request.setEnd(Long.toString(end)); + request.setExcludeInterim(true); + request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan))); + + ClientHelper.executeAsyncWithOrigin(client, ClientHelper.ML_ORIGIN, GetBucketsAction.INSTANCE, request, ActionListener.wrap( + response -> { + Map map = new HashMap<>((int)response.getBuckets().count()); + response.getBuckets().results().forEach(bucket -> map.put(bucket.getEpoch() * 1000, bucket.getEventCount())); + listener.onResponse(map); + }, + listener::onFailure + )); + } + + private void checkTrueData(long start, long end, ActionListener> listener) { + String timeField = job.getDataDescription().getTimeField(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .size(0) + .aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).interval(bucketSpan).field(timeField)) + .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end)); + + SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder); + ClientHelper.executeAsyncWithOrigin(client, ClientHelper.ML_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.wrap( + response -> { + List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); + Map hashMap = new HashMap<>(buckets.size()); + for (Histogram.Bucket bucket : buckets) { + long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); + if (bucketTime < 0) { + listener.onFailure( + new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp")); + return; + } + hashMap.put(bucketTime, bucket.getDocCount()); + } + listener.onResponse(hashMap); + }, + listener::onFailure + )); + } + + private static long toHistogramKeyToEpoch(Object key) { + if (key instanceof DateTime) { + return ((DateTime)key).getMillis(); + } else if (key instanceof Double) { + return ((Double)key).longValue(); + } else if (key instanceof Long){ + return (Long)key; + } else { + return -1L; + } + } +} From feb19081cd61085b0ed00827ae875e69379872a2 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 5 Nov 2018 16:34:59 -0600 Subject: [PATCH 2/8] reverting bad change --- .../elasticsearch/xpack/ml/datafeed/DatafeedManager.java | 6 ------ .../xpack/ml/datafeed/DelayedDataDetector.java | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 190acbdf53531..9f4191b38f2de 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -282,7 +282,6 @@ public class Holder { private final ProblemTracker problemTracker; private final Consumer handler; volatile Future future; - volatile Future futureDataCheck; private volatile boolean isRelocating; Holder(TransportStartDatafeedAction.DatafeedTask task, DatafeedConfig datafeed, DatafeedJob datafeedJob, @@ -327,7 +326,6 @@ public void stop(String source, TimeValue timeout, Exception e) { datafeed.getJobId(), acquired); runningDatafeedsOnThisNode.remove(allocationId); FutureUtils.cancel(future); - FutureUtils.cancel(futureDataCheck); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); handler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeed.getId(), datafeed.getJobId(), @@ -383,10 +381,6 @@ private long executeRealTime() throws Exception { } } - private void executeMissingDataCheck() throws Exception { - - } - private void closeJob() { ClusterState clusterState = clusterService.state(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java index acf795a200cf4..8bea9d97e7657 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java @@ -29,7 +29,7 @@ * This class will search the buckets and indices over a given window to determine if any data is missing */ public class DelayedDataDetector { - + private static final String DATE_BUCKETS = "date_buckets"; private final long bucketSpan; private final long window; From d8332bed0b8e567343eeab97f2062e7e280f2ab3 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 6 Nov 2018 10:06:38 -0600 Subject: [PATCH 3/8] Adding bucket + missing data object for returns --- .../ml/integration/DelayedDataDetectorIT.java | 66 +++++++++++++------ .../ml/datafeed/DelayedDataDetector.java | 49 ++++++++++---- 2 files changed, 82 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java index 2fda75d129164..b0d5e4d2bbc2c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -12,42 +12,31 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.client.Request; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; -import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; -import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector; +import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector.BucketWithMissingData; import org.junit.After; import org.junit.Before; import java.util.Collections; -import java.util.Comparator; import java.util.Date; -import java.util.List; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; import static org.hamcrest.Matchers.equalTo; public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { @@ -61,7 +50,7 @@ public void putDataintoIndex() { client().admin().indices().prepareCreate(index) .addMapping("type", "time", "type=date", "value", "type=long") .get(); - numDocs = randomIntBetween(32, 2048); + numDocs = randomIntBetween(32, 128); long oneDayAgo = now - 86400000; writeData(logger, index, numDocs, oneDayAgo, now); } @@ -92,7 +81,7 @@ public void testWithoutMissing() throws Exception { new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response, equalTo(0L)), + response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)), e -> fail() )); } @@ -118,11 +107,50 @@ public void testWithMissing() throws Exception { DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); - long missingDocs = randomIntBetween(32, 2048); + long missingDocs = randomIntBetween(32, 128); writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response, equalTo(missingDocs)), + response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(missingDocs)), + e -> fail() + )); + } + + public void testWithMissingInSpecificBucket() throws Exception { + final String jobId = "delayed-data-detection-job-missing-test"; + Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + registerJob(job); + putJob(job); + openJob(job.getId()); + + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + + startDatafeed(datafeedConfig.getId(), 0L, now); + waitUntilJobIsClosed(jobId); + + // Get the latest finalized bucket + Bucket lastBucket = getLatestFinalizedBucket(jobId); + + DelayedDataDetector delayedDataDetector = + new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + + long missingDocs = randomIntBetween(1, 10); + writeData(logger, index, missingDocs, (lastBucket.getEpoch() - lastBucket.getBucketSpan())*1000, lastBucket.getEpoch()*1000); + + delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( + response -> { + boolean hasBucketWithMissing = false; + for (BucketWithMissingData bucketWithMissingData : response) { + if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) { + assertThat(bucketWithMissingData.getMissingData(), equalTo(missingDocs)); + hasBucketWithMissing = true; + } + } + assertThat(hasBucketWithMissing, equalTo(true)); + }, e -> fail() )); } @@ -160,7 +188,7 @@ public void testWithoutMissingAndAggregationsAndQuery() throws Exception { new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response, equalTo(0L)), + response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)), e -> fail() )); } @@ -180,7 +208,7 @@ public void testWithMissingAndAggregationsAndQuery() throws Exception { .field("time") .interval(TimeValue.timeValueMinutes(5).millis()))); datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); - datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gt(numDocs/2)); + datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2)); DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); registerJob(job); putJob(job); @@ -201,7 +229,7 @@ public void testWithMissingAndAggregationsAndQuery() throws Exception { writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response, equalTo(missingDocs/2)), + response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo((missingDocs+1)/2)), e -> fail() )); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java index 8bea9d97e7657..75ba603a35980 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java @@ -1,7 +1,5 @@ package org.elasticsearch.xpack.ml.datafeed; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; @@ -16,6 +14,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.utils.Intervals; import org.joda.time.DateTime; @@ -49,23 +48,22 @@ public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue win this.client = client; } - public void missingData(long latestFinalizedBucket, ActionListener listener) { + public void missingData(long latestFinalizedBucket, ActionListener> listener) { final long end = Intervals.alignToFloor(latestFinalizedBucket, bucketSpan); final long start = Intervals.alignToFloor(latestFinalizedBucket - window, bucketSpan); checkBucketEvents(start, end, ActionListener.wrap( finalizedBuckets -> checkTrueData(start, end, ActionListener.wrap( - indexedData -> listener.onResponse(finalizedBuckets.entrySet() - .stream() - .map((entry) -> indexedData.getOrDefault(entry.getKey(), 0L) - entry.getValue()) - .filter(v -> v > 0) - .collect(Collectors.summingLong(Long::longValue))), + indexedData -> listener.onResponse(finalizedBuckets.stream() + .filter(bucket -> calculateMissing(indexedData, bucket) > 0) + .map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket)) + .collect(Collectors.toList())), listener::onFailure) ), listener::onFailure )); } - private void checkBucketEvents(long start, long end, ActionListener> listener) { + private void checkBucketEvents(long start, long end, ActionListener> listener) { GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); request.setStart(Long.toString(start)); request.setEnd(Long.toString(end)); @@ -73,11 +71,7 @@ private void checkBucketEvents(long start, long end, ActionListener { - Map map = new HashMap<>((int)response.getBuckets().count()); - response.getBuckets().results().forEach(bucket -> map.put(bucket.getEpoch() * 1000, bucket.getEventCount())); - listener.onResponse(map); - }, + response -> listener.onResponse(response.getBuckets().results()), listener::onFailure )); } @@ -120,4 +114,31 @@ private static long toHistogramKeyToEpoch(Object key) { return -1L; } } + + private static long calculateMissing(Map indexedData, Bucket bucket) { + return indexedData.getOrDefault(bucket.getEpoch() * 1000, 0L) - bucket.getEventCount(); + } + + public static class BucketWithMissingData { + + private final long missingData; + private final Bucket bucket; + + static BucketWithMissingData fromMissingAndBucket(long missingData, Bucket bucket) { + return new BucketWithMissingData(missingData, bucket); + } + + private BucketWithMissingData(long missingData, Bucket bucket) { + this.missingData = missingData; + this.bucket = bucket; + } + + public Bucket getBucket() { + return bucket; + } + + public long getMissingData() { + return missingData; + } + } } From 555b42dc65c742db7d01fd203689e1ad654e2e94 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 6 Nov 2018 10:07:38 -0600 Subject: [PATCH 4/8] reverting unnecessary change --- .../java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 5af564b8b9133..54a79ee199ee1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -19,7 +19,6 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.messages.Messages; From d973f21a6afb546297ed61b82aab08ee30b839ab Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 6 Nov 2018 10:15:19 -0600 Subject: [PATCH 5/8] adding license header --- .../elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java index 75ba603a35980..bcd9e1dbfc034 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java @@ -1,3 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.action.ActionListener; From 1754f952536d3f1fcccae234da48d4db039bc146 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 6 Nov 2018 14:39:21 -0600 Subject: [PATCH 6/8] Make client calls synchronous, akin to DatafeedJob --- .../ml/integration/DelayedDataDetectorIT.java | 114 ++++-------------- .../ml/datafeed/DelayedDataDetector.java | 65 +++++----- 2 files changed, 53 insertions(+), 126 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java index b0d5e4d2bbc2c..f634e84ee9d15 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.integration; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -34,6 +33,7 @@ import java.util.Collections; import java.util.Date; +import java.util.List; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; @@ -60,8 +60,8 @@ public void cleanUpTest() { cleanUp(); } - public void testWithoutMissing() throws Exception { - final String jobId = "delayed-data-detection-job-no-missing-test"; + public void test() throws Exception { + final String jobId = "delayed-data-detection-job"; Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); @@ -80,44 +80,18 @@ public void testWithoutMissing() throws Exception { DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); - delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)), - e -> fail() - )); - } - - public void testWithMissing() throws Exception { - final String jobId = "delayed-data-detection-job-missing-test"; - Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); - - DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); - registerJob(job); - putJob(job); - openJob(job.getId()); - - registerDatafeed(datafeedConfig); - putDatafeed(datafeedConfig); - - startDatafeed(datafeedConfig.getId(), 0L, now); - waitUntilJobIsClosed(jobId); - - // Get the latest finalized bucket - Bucket lastBucket = getLatestFinalizedBucket(jobId); - - DelayedDataDetector delayedDataDetector = - new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + List response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)); long missingDocs = randomIntBetween(32, 128); writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); - delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(missingDocs)), - e -> fail() - )); + response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(missingDocs)); } public void testWithMissingInSpecificBucket() throws Exception { - final String jobId = "delayed-data-detection-job-missing-test"; + final String jobId = "delayed-data-detection-job-missing-test-specific-bucket"; Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); @@ -139,23 +113,18 @@ public void testWithMissingInSpecificBucket() throws Exception { long missingDocs = randomIntBetween(1, 10); writeData(logger, index, missingDocs, (lastBucket.getEpoch() - lastBucket.getBucketSpan())*1000, lastBucket.getEpoch()*1000); - - delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> { - boolean hasBucketWithMissing = false; - for (BucketWithMissingData bucketWithMissingData : response) { - if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) { - assertThat(bucketWithMissingData.getMissingData(), equalTo(missingDocs)); - hasBucketWithMissing = true; - } - } - assertThat(hasBucketWithMissing, equalTo(true)); - }, - e -> fail() - )); + List response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); + boolean hasBucketWithMissing = false; + for (BucketWithMissingData bucketWithMissingData : response) { + if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) { + assertThat(bucketWithMissingData.getMissingData(), equalTo(missingDocs)); + hasBucketWithMissing = true; + } + } + assertThat(hasBucketWithMissing, equalTo(true)); } - public void testWithoutMissingAndAggregationsAndQuery() throws Exception { + public void testMissingWithAggregationsAndQuery() throws Exception { TimeValue bucketSpan = TimeValue.timeValueMinutes(10); final String jobId = "delayed-data-detection-job-aggs-no-missing-test"; Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count"); @@ -169,7 +138,7 @@ public void testWithoutMissingAndAggregationsAndQuery() throws Exception { .subAggregation(avgAggregationBuilder) .field("time") .interval(TimeValue.timeValueMinutes(5).millis()))); - datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gt(numDocs/2)); + datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2)); datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); registerJob(job); @@ -187,51 +156,14 @@ public void testWithoutMissingAndAggregationsAndQuery() throws Exception { DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); - delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)), - e -> fail() - )); - } - - public void testWithMissingAndAggregationsAndQuery() throws Exception { - TimeValue bucketSpan = TimeValue.timeValueMinutes(10); - final String jobId = "delayed-data-detection-job-aggs-and-missing-test"; - Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count"); - - MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value"); - DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); - datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator( - AggregationBuilders.histogram("time") - .subAggregation(maxTime) - .subAggregation(avgAggregationBuilder) - .field("time") - .interval(TimeValue.timeValueMinutes(5).millis()))); - datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); - datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2)); - DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); - registerJob(job); - putJob(job); - openJob(job.getId()); - - registerDatafeed(datafeedConfig); - putDatafeed(datafeedConfig); - startDatafeed(datafeedConfig.getId(), 0L, now); - waitUntilJobIsClosed(jobId); - - // Get the latest finalized bucket - Bucket lastBucket = getLatestFinalizedBucket(jobId); - - DelayedDataDetector delayedDataDetector = - new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + List response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)); long missingDocs = numDocs; writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); - delayedDataDetector.missingData(lastBucket.getEpoch()*1000, ActionListener.wrap( - response -> assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo((missingDocs+1)/2)), - e -> fail() - )); + response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo((missingDocs+1)/2)); } private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java index bcd9e1dbfc034..46786159207b7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java @@ -5,15 +5,15 @@ */ package org.elasticsearch.xpack.ml.datafeed; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -28,6 +28,9 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; + /** * This class will search the buckets and indices over a given window to determine if any data is missing @@ -53,35 +56,31 @@ public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue win this.client = client; } - public void missingData(long latestFinalizedBucket, ActionListener> listener) { + public List missingData(long latestFinalizedBucket) { final long end = Intervals.alignToFloor(latestFinalizedBucket, bucketSpan); final long start = Intervals.alignToFloor(latestFinalizedBucket - window, bucketSpan); - checkBucketEvents(start, end, ActionListener.wrap( - finalizedBuckets -> checkTrueData(start, end, ActionListener.wrap( - indexedData -> listener.onResponse(finalizedBuckets.stream() - .filter(bucket -> calculateMissing(indexedData, bucket) > 0) - .map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket)) - .collect(Collectors.toList())), - listener::onFailure) - ), - listener::onFailure - )); + List finalizedBuckets = checkBucketEvents(start, end); + Map indexedData = checkTrueData(start, end); + return finalizedBuckets.stream() + .filter(bucket -> calculateMissing(indexedData, bucket) > 0) + .map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket)) + .collect(Collectors.toList()); } - private void checkBucketEvents(long start, long end, ActionListener> listener) { + private List checkBucketEvents(long start, long end) { GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); request.setStart(Long.toString(start)); request.setEnd(Long.toString(end)); request.setExcludeInterim(true); request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan))); - ClientHelper.executeAsyncWithOrigin(client, ClientHelper.ML_ORIGIN, GetBucketsAction.INSTANCE, request, ActionListener.wrap( - response -> listener.onResponse(response.getBuckets().results()), - listener::onFailure - )); + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { + GetBucketsAction.Response response = client.execute(GetBucketsAction.INSTANCE, request).actionGet(); + return response.getBuckets().results(); + } } - private void checkTrueData(long start, long end, ActionListener> listener) { + private Map checkTrueData(long start, long end) { String timeField = job.getDataDescription().getTimeField(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .size(0) @@ -89,23 +88,19 @@ private void checkTrueData(long start, long end, ActionListener> .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end)); SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder); - ClientHelper.executeAsyncWithOrigin(client, ClientHelper.ML_ORIGIN, SearchAction.INSTANCE, searchRequest, ActionListener.wrap( - response -> { - List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); - Map hashMap = new HashMap<>(buckets.size()); - for (Histogram.Bucket bucket : buckets) { - long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); - if (bucketTime < 0) { - listener.onFailure( - new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp")); - return; - } - hashMap.put(bucketTime, bucket.getDocCount()); + try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { + SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); + List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); + Map hashMap = new HashMap<>(buckets.size()); + for (Histogram.Bucket bucket : buckets) { + long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); + if (bucketTime < 0) { + throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp"); } - listener.onResponse(hashMap); - }, - listener::onFailure - )); + hashMap.put(bucketTime, bucket.getDocCount()); + } + return hashMap; + } } private static long toHistogramKeyToEpoch(Object key) { From 75673e465c904b9942829e747ab5e6e8bf19fd2f Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 7 Nov 2018 07:09:05 -0600 Subject: [PATCH 7/8] Fixing line length --- .../xpack/ml/integration/DelayedDataDetectorIT.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java index f634e84ee9d15..53b395a242a8d 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -131,7 +131,9 @@ public void testMissingWithAggregationsAndQuery() throws Exception { MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value"); - DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", + job.getId(), + Collections.singletonList(index)); datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("time") .subAggregation(maxTime) From 3101167e7e8d74facf8050bab6500be381a91438 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 7 Nov 2018 09:08:15 -0600 Subject: [PATCH 8/8] Renaming things, addressing PR comments --- .../ml/integration/DelayedDataDetectorIT.java | 33 ++++---- .../ml/datafeed/DelayedDataDetector.java | 40 ++++++---- .../ml/datafeed/DelayedDataDetectorTests.java | 76 +++++++++++++++++++ 3 files changed, 123 insertions(+), 26 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java index 53b395a242a8d..c672fadde7812 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -60,7 +60,7 @@ public void cleanUpTest() { cleanUp(); } - public void test() throws Exception { + public void testMissingDataDetection() throws Exception { final String jobId = "delayed-data-detection-job"; Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); @@ -80,17 +80,19 @@ public void test() throws Exception { DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); - List response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); - assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)); + List response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); long missingDocs = randomIntBetween(32, 128); + // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window + // for the DelayedDataDetector writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); - response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); - assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(missingDocs)); + response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(missingDocs)); } - public void testWithMissingInSpecificBucket() throws Exception { + public void testMissingDataDetectionInSpecificBucket() throws Exception { final String jobId = "delayed-data-detection-job-missing-test-specific-bucket"; Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); @@ -112,19 +114,22 @@ public void testWithMissingInSpecificBucket() throws Exception { new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); long missingDocs = randomIntBetween(1, 10); + + // Write our missing data in the bucket right before the last finalized bucket writeData(logger, index, missingDocs, (lastBucket.getEpoch() - lastBucket.getBucketSpan())*1000, lastBucket.getEpoch()*1000); - List response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); + List response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + boolean hasBucketWithMissing = false; for (BucketWithMissingData bucketWithMissingData : response) { if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) { - assertThat(bucketWithMissingData.getMissingData(), equalTo(missingDocs)); + assertThat(bucketWithMissingData.getMissingDocumentCount(), equalTo(missingDocs)); hasBucketWithMissing = true; } } assertThat(hasBucketWithMissing, equalTo(true)); } - public void testMissingWithAggregationsAndQuery() throws Exception { + public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception { TimeValue bucketSpan = TimeValue.timeValueMinutes(10); final String jobId = "delayed-data-detection-job-aggs-no-missing-test"; Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count"); @@ -158,14 +163,16 @@ public void testMissingWithAggregationsAndQuery() throws Exception { DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); - List response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); - assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo(0L)); + List response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); long missingDocs = numDocs; + // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window + // for the DelayedDataDetector writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); - response = delayedDataDetector.missingData(lastBucket.getEpoch()*1000); - assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingData).sum(), equalTo((missingDocs+1)/2)); + response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); + assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo((missingDocs+1)/2)); } private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java index 46786159207b7..3c7c6ff963e07 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java @@ -52,16 +52,30 @@ public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue win if (windowMillis < bucketSpan) { throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]"); } + if (Intervals.alignToFloor(windowMillis/bucketSpan, bucketSpan) >= 10000) { + throw new IllegalArgumentException("[window] must contain less than 10000 buckets at the current [bucket_span]"); + } this.window = windowMillis; this.client = client; } - public List missingData(long latestFinalizedBucket) { - final long end = Intervals.alignToFloor(latestFinalizedBucket, bucketSpan); - final long start = Intervals.alignToFloor(latestFinalizedBucket - window, bucketSpan); + /** + * This method looks at the {@link DatafeedConfig} from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket}. + * + * It is done synchronously, and can block for a considerable amount of time, it should only be executed within the appropriate + * thread pool. + * + * @param latestFinalizedBucketMs The latest finalized bucket timestamp in milliseconds, signifies the end of the time window check + * @return A List of {@link BucketWithMissingData} objects that contain each bucket with the current number of missing docs + */ + public List detectMissingData(long latestFinalizedBucketMs) { + final long end = Intervals.alignToFloor(latestFinalizedBucketMs, bucketSpan); + final long start = Intervals.alignToFloor(latestFinalizedBucketMs - window, bucketSpan); List finalizedBuckets = checkBucketEvents(start, end); - Map indexedData = checkTrueData(start, end); + Map indexedData = checkCurrentBucketEventCount(start, end); return finalizedBuckets.stream() + // We only care about the situation when data is added to the indices + // Older data could have been removed from the indices, and should not be considered "missing data" .filter(bucket -> calculateMissing(indexedData, bucket) > 0) .map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket)) .collect(Collectors.toList()); @@ -80,7 +94,7 @@ private List checkBucketEvents(long start, long end) { } } - private Map checkTrueData(long start, long end) { + private Map checkCurrentBucketEventCount(long start, long end) { String timeField = job.getDataDescription().getTimeField(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .size(0) @@ -95,7 +109,7 @@ private Map checkTrueData(long start, long end) { for (Histogram.Bucket bucket : buckets) { long bucketTime = toHistogramKeyToEpoch(bucket.getKey()); if (bucketTime < 0) { - throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp"); + throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp"); } hashMap.put(bucketTime, bucket.getDocCount()); } @@ -121,15 +135,15 @@ private static long calculateMissing(Map indexedData, Bucket bucket) public static class BucketWithMissingData { - private final long missingData; + private final long missingDocumentCount; private final Bucket bucket; - static BucketWithMissingData fromMissingAndBucket(long missingData, Bucket bucket) { - return new BucketWithMissingData(missingData, bucket); + static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) { + return new BucketWithMissingData(missingDocumentCount, bucket); } - private BucketWithMissingData(long missingData, Bucket bucket) { - this.missingData = missingData; + private BucketWithMissingData(long missingDocumentCount, Bucket bucket) { + this.missingDocumentCount = missingDocumentCount; this.bucket = bucket; } @@ -137,8 +151,8 @@ public Bucket getBucket() { return bucket; } - public long getMissingData() { - return missingData; + public long getMissingDocumentCount() { + return missingDocumentCount; } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java new file mode 100644 index 0000000000000..9a54181af9ce6 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.util.Collections; +import java.util.Date; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + + +public class DelayedDataDetectorTests extends ESTestCase { + + + public void testConstructorWithValueValues() { + TimeValue window = TimeValue.timeValueSeconds(10); + Job job = createJob(TimeValue.timeValueSeconds(1)); + DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job, createDatafeed(), window, mock(Client.class)); + assertNotNull(delayedDataDetector); + } + + public void testConstructorWithInvalidValues() { + TimeValue shortWindow = TimeValue.timeValueMillis(500); + Job job = createJob(TimeValue.timeValueSeconds(1)); + + Exception exception = expectThrows(IllegalArgumentException.class, + ()-> new DelayedDataDetector(job, createDatafeed(), shortWindow, mock(Client.class))); + assertThat(exception.getMessage(), equalTo("[window] must be greater or equal to the [bucket_span]")); + + TimeValue longWindow = TimeValue.timeValueSeconds(20000); + + exception = expectThrows(IllegalArgumentException.class, + ()-> new DelayedDataDetector(job, createDatafeed(), longWindow, mock(Client.class))); + assertThat(exception.getMessage(), equalTo("[window] must contain less than 10000 buckets at the current [bucket_span]")); + } + + + private Job createJob(TimeValue bucketSpan) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(bucketSpan); + + Job.Builder builder = new Job.Builder(); + builder.setId("test-job"); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder.build(new Date()); + } + + private DatafeedConfig createDatafeed() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("id", "jobId"); + builder.setIndices(Collections.singletonList("index1")); + builder.setTypes(Collections.singletonList("doc")); + return builder.build(); + } + + + +}