Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.DelayedDataDetector.BucketWithMissingData;
import org.junit.After;
import org.junit.Before;

import java.util.Collections;
import java.util.Date;
import java.util.List;

import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder;
import static org.hamcrest.Matchers.equalTo;

public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase {

private String index = "delayed-data";
private long now = System.currentTimeMillis();
private long numDocs;

@Before
public void putDataintoIndex() {
client().admin().indices().prepareCreate(index)
.addMapping("type", "time", "type=date", "value", "type=long")
.get();
numDocs = randomIntBetween(32, 128);
long oneDayAgo = now - 86400000;
writeData(logger, index, numDocs, oneDayAgo, now);
}

@After
public void cleanUpTest() {
cleanUp();
}

public void testMissingDataDetection() throws Exception {
final String jobId = "delayed-data-detection-job";
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);

DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
registerJob(job);
putJob(job);
openJob(job.getId());

registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now);
waitUntilJobIsClosed(jobId);

// Get the latest finalized bucket
Bucket lastBucket = getLatestFinalizedBucket(jobId);

DelayedDataDetector delayedDataDetector =
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());

List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L));

long missingDocs = randomIntBetween(32, 128);
// Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window
// for the DelayedDataDetector
writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000);

response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(missingDocs));
}

public void testMissingDataDetectionInSpecificBucket() throws Exception {
final String jobId = "delayed-data-detection-job-missing-test-specific-bucket";
Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null);

DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index));
registerJob(job);
putJob(job);
openJob(job.getId());

registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);

startDatafeed(datafeedConfig.getId(), 0L, now);
waitUntilJobIsClosed(jobId);

// Get the latest finalized bucket
Bucket lastBucket = getLatestFinalizedBucket(jobId);

DelayedDataDetector delayedDataDetector =
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());

long missingDocs = randomIntBetween(1, 10);

// Write our missing data in the bucket right before the last finalized bucket
writeData(logger, index, missingDocs, (lastBucket.getEpoch() - lastBucket.getBucketSpan())*1000, lastBucket.getEpoch()*1000);
List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);

boolean hasBucketWithMissing = false;
for (BucketWithMissingData bucketWithMissingData : response) {
if (bucketWithMissingData.getBucket().getEpoch() == lastBucket.getEpoch() - lastBucket.getBucketSpan()) {
assertThat(bucketWithMissingData.getMissingDocumentCount(), equalTo(missingDocs));
hasBucketWithMissing = true;
}
}
assertThat(hasBucketWithMissing, equalTo(true));
}

public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception {
TimeValue bucketSpan = TimeValue.timeValueMinutes(10);
final String jobId = "delayed-data-detection-job-aggs-no-missing-test";
Job.Builder job = createJob(jobId, bucketSpan, "mean", "value", "doc_count");

MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("value").field("value");
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList(index));
datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time")
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.field("time")
.interval(TimeValue.timeValueMinutes(5).millis())));
datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2));
datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5));
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerJob(job);
putJob(job);
openJob(job.getId());

registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, now);
waitUntilJobIsClosed(jobId);

// Get the latest finalized bucket
Bucket lastBucket = getLatestFinalizedBucket(jobId);

DelayedDataDetector delayedDataDetector =
new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client());

List<BucketWithMissingData> response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L));

long missingDocs = numDocs;
// Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window
// for the DelayedDataDetector
writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000);

response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000);
assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo((missingDocs+1)/2));
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) {
return createJob(id, bucketSpan, function, field, null);
}

private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field, String summaryCountField) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
dataDescription.setTimeField("time");
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);

Detector.Builder d = new Detector.Builder(function, field);
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(bucketSpan);
analysisConfig.setSummaryCountFieldName(summaryCountField);

Job.Builder builder = new Job.Builder();
builder.setId(id);
builder.setAnalysisConfig(analysisConfig);
builder.setDataDescription(dataDescription);
return builder;
}

private void writeData(Logger logger, String index, long numDocs, long start, long end) {
int maxDelta = (int) (end - start - 1);
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < numDocs; i++) {
IndexRequest indexRequest = new IndexRequest(index, "type");
long timestamp = start + randomIntBetween(0, maxDelta);
assert timestamp >= start && timestamp < end;
indexRequest.source("time", timestamp, "value", i);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
if (bulkResponse.hasFailures()) {
int failures = 0;
for (BulkItemResponse itemResponse : bulkResponse) {
if (itemResponse.isFailed()) {
failures++;
logger.error("Item response failure [{}]", itemResponse.getFailureMessage());
}
}
fail("Bulk response contained " + failures + " failures");
}
logger.info("Indexed [{}] documents", numDocs);
}

private Bucket getLatestFinalizedBucket(String jobId) {
GetBucketsAction.Request getBucketsRequest = new GetBucketsAction.Request(jobId);
getBucketsRequest.setExcludeInterim(true);
getBucketsRequest.setSort(Result.TIMESTAMP.getPreferredName());
getBucketsRequest.setDescending(true);
getBucketsRequest.setPageParams(new PageParams(0, 1));
return getBuckets(getBucketsRequest).get(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;

import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.joda.time.DateTime;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin;


/**
* This class will search the buckets and indices over a given window to determine if any data is missing
*/
public class DelayedDataDetector {

private static final String DATE_BUCKETS = "date_buckets";
private final long bucketSpan;
private final long window;
private final DatafeedConfig datafeedConfig;
private final Client client;
private final Job job;

public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue window, Client client) {
this.job = job;
this.bucketSpan = job.getAnalysisConfig().getBucketSpan().millis();
this.datafeedConfig = datafeedConfig;
long windowMillis = window.millis();
if (windowMillis < bucketSpan) {
throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]");
}
if (Intervals.alignToFloor(windowMillis/bucketSpan, bucketSpan) >= 10000) {
throw new IllegalArgumentException("[window] must contain less than 10000 buckets at the current [bucket_span]");
}
this.window = windowMillis;
this.client = client;
}

/**
* This method looks at the {@link DatafeedConfig} from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket}.
*
* It is done synchronously, and can block for a considerable amount of time, it should only be executed within the appropriate
* thread pool.
*
* @param latestFinalizedBucketMs The latest finalized bucket timestamp in milliseconds, signifies the end of the time window check
* @return A List of {@link BucketWithMissingData} objects that contain each bucket with the current number of missing docs
*/
public List<BucketWithMissingData> detectMissingData(long latestFinalizedBucketMs) {
final long end = Intervals.alignToFloor(latestFinalizedBucketMs, bucketSpan);
final long start = Intervals.alignToFloor(latestFinalizedBucketMs - window, bucketSpan);
List<Bucket> finalizedBuckets = checkBucketEvents(start, end);
Map<Long, Long> indexedData = checkCurrentBucketEventCount(start, end);
return finalizedBuckets.stream()
// We only care about the situation when data is added to the indices
// Older data could have been removed from the indices, and should not be considered "missing data"
.filter(bucket -> calculateMissing(indexedData, bucket) > 0)
.map(bucket -> BucketWithMissingData.fromMissingAndBucket(calculateMissing(indexedData, bucket), bucket))
.collect(Collectors.toList());
}

private List<Bucket> checkBucketEvents(long start, long end) {
GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId());
request.setStart(Long.toString(start));
request.setEnd(Long.toString(end));
request.setExcludeInterim(true);
request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan)));

try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
GetBucketsAction.Response response = client.execute(GetBucketsAction.INSTANCE, request).actionGet();
return response.getBuckets().results();
}
}

private Map<Long, Long> checkCurrentBucketEventCount(long start, long end) {
String timeField = job.getDataDescription().getTimeField();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.size(0)
.aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).interval(bucketSpan).field(timeField))
.query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end));

SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder);
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet();
List<? extends Histogram.Bucket> buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets();
Map<Long, Long> hashMap = new HashMap<>(buckets.size());
for (Histogram.Bucket bucket : buckets) {
long bucketTime = toHistogramKeyToEpoch(bucket.getKey());
if (bucketTime < 0) {
throw new IllegalStateException("Histogram key [" + bucket.getKey() + "] cannot be converted to a timestamp");
}
hashMap.put(bucketTime, bucket.getDocCount());
}
return hashMap;
}
}

private static long toHistogramKeyToEpoch(Object key) {
if (key instanceof DateTime) {
return ((DateTime)key).getMillis();
} else if (key instanceof Double) {
return ((Double)key).longValue();
} else if (key instanceof Long){
return (Long)key;
} else {
return -1L;
}
}

private static long calculateMissing(Map<Long, Long> indexedData, Bucket bucket) {
return indexedData.getOrDefault(bucket.getEpoch() * 1000, 0L) - bucket.getEventCount();
}

public static class BucketWithMissingData {

private final long missingDocumentCount;
private final Bucket bucket;

static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) {
return new BucketWithMissingData(missingDocumentCount, bucket);
}

private BucketWithMissingData(long missingDocumentCount, Bucket bucket) {
this.missingDocumentCount = missingDocumentCount;
this.bucket = bucket;
}

public Bucket getBucket() {
return bucket;
}

public long getMissingDocumentCount() {
return missingDocumentCount;
}
}
}
Loading