From 83c78145950c67d719a8f7ef826f46333941bfe2 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 25 Apr 2018 13:49:01 +0100 Subject: [PATCH 1/4] [ML] Refactor DataStreamDiagnostics to use array This commit refactors the DataStreamDiagnostics class achieving the following advantages: - simpler code; by encapsulating the moving bucket histogram into its own class - better performance; by using an array to store the buckets instead of a map - explicit handling of gap buckets; in preparation of fixing #30080 --- .../ml/job/process/DataCountsReporter.java | 3 +- .../ml/job/process/DataStreamDiagnostics.java | 223 ------------------ .../process/diagnostics/BucketHistogram.java | 132 +++++++++++ .../diagnostics/DataStreamDiagnostics.java | 113 +++++++++ .../DataStreamDiagnosticsTests.java | 32 ++- 5 files changed, 274 insertions(+), 229 deletions(-) delete mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/{ => diagnostics}/DataStreamDiagnosticsTests.java (93%) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index debe1c36bbace..80223027e8ee0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -12,8 +12,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; +import org.elasticsearch.xpack.ml.job.process.diagnostics.DataStreamDiagnostics; import java.util.Date; import java.util.Locale; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java deleted file mode 100644 index b733204258689..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.job.process; - -import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.Counter; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.core.ml.job.config.Job; - -import java.util.Date; -import java.util.SortedMap; -import java.util.TreeMap; - -public class DataStreamDiagnostics { - - /** - * Minimum window to take into consideration for bucket count histogram. - */ - private static final int MIN_BUCKET_WINDOW = 10; - - /** - * Threshold to report potential sparsity problems. - * - * Sparsity score is calculated: log(average) - log(current) - * - * If score is above the threshold, bucket is reported as sparse bucket. - */ - private static final int DATA_SPARSITY_THRESHOLD = 2; - private static final long MS_IN_SECOND = 1000; - - private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class); - /** - * Container for the histogram - * - * Note: Using a sorted map in order to iterate in order when consuming the - * data. The counter is lazily initialized and potentially missing in case - * of empty buckets. - * - * The container gets pruned along the data streaming based on the bucket - * window, so it should not contain more than max(MIN_BUCKET_WINDOW, - * 'buckets_required_by_latency') + 1 items at any time. - * - * Sparsity can only be calculated after the window has been filled. Currently - * this window is lost if a job gets closed and re-opened. We might fix this - * in future. - */ - private final SortedMap movingBucketHistogram = new TreeMap<>(); - - private final long bucketSpan; - private final long latency; - private long movingBucketCount = 0; - private long latestReportedBucket = -1; - - private long bucketCount = 0; - private long emptyBucketCount = 0; - private long latestEmptyBucketTime = -1; - private long sparseBucketCount = 0; - private long latestSparseBucketTime = -1; - - public DataStreamDiagnostics(Job job) { - bucketSpan = job.getAnalysisConfig().getBucketSpan().seconds(); - latency = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().seconds(); - } - - /** - * Check record - * - * @param recordTimestampInMs - * The record timestamp in milliseconds since epoch - */ - - public void checkRecord(long recordTimestampInMs) { - checkBucketing(recordTimestampInMs); - } - - /** - * Flush all counters, should be called at the end of the data stream - */ - public void flush() { - // flush all we know - if (movingBucketHistogram.isEmpty() == false) { - flush(movingBucketHistogram.lastKey()); - } - } - - /** - * Check bucketing of record. Report empty and sparse buckets. - * - * @param recordTimestampInMs - * The record timestamp in milliseconds since epoch - */ - private void checkBucketing(long recordTimestampInMs) { - long bucket = (recordTimestampInMs / MS_IN_SECOND) / bucketSpan; - long bucketHistogramStartBucket = ((recordTimestampInMs / MS_IN_SECOND) - latency) / bucketSpan; - - bucketHistogramStartBucket = Math.min(bucket - MIN_BUCKET_WINDOW, bucketHistogramStartBucket); - - movingBucketHistogram.computeIfAbsent(bucket, l -> Counter.newCounter()).addAndGet(1); - ++movingBucketCount; - - // find the very first bucket - if (latestReportedBucket == -1) { - latestReportedBucket = bucket - 1; - } - - // flush all bucket out of the window - flush(bucketHistogramStartBucket); - } - - /** - * Flush Bucket reporting till the given bucket. - * - * @param bucketNumber - * The number of the last bucket that can be flushed. - */ - private void flush(long bucketNumber) { - - // check for a longer period of empty buckets - long emptyBuckets = movingBucketHistogram.firstKey() - latestReportedBucket - 1; - if (emptyBuckets > 0) { - bucketCount += emptyBuckets; - emptyBucketCount += emptyBuckets; - latestEmptyBucketTime = (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND; - latestReportedBucket = movingBucketHistogram.firstKey() - 1; - } - - // calculate the average number of data points in a bucket based on the - // current history - double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size(); - - // prune all buckets that can be flushed - long lastBucketSparsityCheck = Math.min(bucketNumber, movingBucketHistogram.lastKey()); - - for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) { - - Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); - long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; - - LOGGER.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize); - ++bucketCount; - latestReportedBucket = pruneBucket; - - // substract bucketSize from the counter - movingBucketCount -= bucketSize; - - // check if bucket is empty - if (bucketSize == 0L) { - latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; - ++emptyBucketCount; - - // do not do sparse analysis on an empty bucket - continue; - } - - // simplistic way to calculate data sparsity, just take the log and - // check the difference - double logAverageBucketSize = Math.log(averageBucketSize); - double logBucketSize = Math.log(bucketSize); - double sparsityScore = logAverageBucketSize - logBucketSize; - - if (sparsityScore > DATA_SPARSITY_THRESHOLD) { - LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize, - averageBucketSize, sparsityScore); - ++sparseBucketCount; - latestSparseBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; - } - } - - // prune the rest if necessary - for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketNumber; ++pruneBucket) { - Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); - long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; - - bucketCount++; - latestReportedBucket = pruneBucket; - - // substract bucketSize from the counter - movingBucketCount -= bucketSize; - - // check if bucket is empty - if (bucketSize == 0L) { - latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; - ++emptyBucketCount; - } - } - } - - public long getBucketCount() { - return bucketCount; - } - - public long getEmptyBucketCount() { - return emptyBucketCount; - } - - public Date getLatestEmptyBucketTime() { - return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null; - } - - public long getSparseBucketCount() { - return sparseBucketCount; - } - - public Date getLatestSparseBucketTime() { - return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null; - } - - /** - * Resets counts, - * - * Note: This does not reset the inner state for e.g. sparse bucket - * detection. - * - */ - public void resetCounts() { - bucketCount = 0; - emptyBucketCount = 0; - sparseBucketCount = 0; - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java new file mode 100644 index 0000000000000..2b5e5f340f1f5 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.diagnostics; + +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.Intervals; + +/** + * A moving window of buckets that allow keeping + * track of some statistics like the bucket count, + * empty or sparse buckets, etc. + * + * The counts are stored in an array that functions as a + * circular buffer. When time is advanced, all buckets + * out of the window are flushed. + */ +class BucketHistogram { + + private static final int MIN_BUCKETS = 10; + + private final long bucketSpanMs; + private final long latencyMs; + private final int maxSize; + private final long[] buckets; + private long movingBucketCount = 0; + private long latestBucketStartMs = -1; + private int latestBucketIndex; + private long earliestBucketStartMs = -1; + private int earliestBucketIndex; + private long latestFlushedBucketStartMs = -1; + private final BucketFlushListener bucketFlushListener; + + public BucketHistogram(Job job, BucketFlushListener bucketFlushListener) { + bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); + latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); + maxSize = Math.max((int) (latencyMs / bucketSpanMs), MIN_BUCKETS); + buckets = new long[maxSize]; + this.bucketFlushListener = bucketFlushListener; + } + + public void addRecord(long recordTimestampMs) { + long bucketStartMs = Intervals.alignToFloor(recordTimestampMs, bucketSpanMs); + + // Initialize earliest/latest times + if (latestBucketStartMs < 0) { + latestBucketStartMs = bucketStartMs; + earliestBucketStartMs = bucketStartMs; + } + + advanceTime(bucketStartMs); + addToBucket(bucketStartMs); + } + + private void advanceTime(long bucketStartMs) { + while (bucketStartMs > latestBucketStartMs) { + int flushBucketIndex = (latestBucketIndex + 1) % maxSize; + + if (flushBucketIndex == earliestBucketIndex) { + flush(flushBucketIndex); + movingBucketCount -= buckets[flushBucketIndex]; + earliestBucketStartMs += bucketSpanMs; + earliestBucketIndex = (earliestBucketIndex + 1) % maxSize; + } + buckets[flushBucketIndex] = 0L; + + latestBucketStartMs += bucketSpanMs; + latestBucketIndex = flushBucketIndex; + } + } + + private void addToBucket(long bucketStartMs) { + int offsetToLatest = (int) ((bucketStartMs - latestBucketStartMs) / bucketSpanMs); + int bucketIndex = (latestBucketIndex + offsetToLatest) % maxSize; + if (bucketIndex < 0) { + bucketIndex = maxSize + bucketIndex; + } + + ++buckets[bucketIndex]; + ++movingBucketCount; + + if (bucketStartMs < earliestBucketStartMs) { + earliestBucketStartMs = bucketStartMs; + earliestBucketIndex = bucketIndex; + } + } + + private void flush(int bucketIndex) { + long bucketStartMs = getTimestampMs(bucketIndex); + if (bucketStartMs > latestFlushedBucketStartMs) { + bucketFlushListener.onBucketFlush(bucketStartMs, buckets[bucketIndex]); + latestFlushedBucketStartMs = bucketStartMs; + } + } + + private long getTimestampMs(int bucketIndex) { + int offsetToLatest = latestBucketIndex - bucketIndex; + if (offsetToLatest < 0) { + offsetToLatest = maxSize + offsetToLatest; + } + return latestBucketStartMs - offsetToLatest * bucketSpanMs; + } + + public void flush() { + if (latestBucketStartMs < 0) { + return; + } + + int bucketIndex = earliestBucketIndex; + while (bucketIndex != latestBucketIndex) { + flush(bucketIndex); + bucketIndex = (bucketIndex + 1) % maxSize; + } + } + + public double averageBucketCount() { + return (double) movingBucketCount / size(); + } + + private int size() { + if (latestBucketStartMs < 0) { + return 0; + } + return (int) ((latestBucketStartMs - earliestBucketStartMs) / bucketSpanMs) + 1; + } + + public interface BucketFlushListener { + void onBucketFlush(long bucketStartMs, long bucketCounts); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java new file mode 100644 index 0000000000000..ecad635752e6b --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.diagnostics; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +import java.util.Date; + +public class DataStreamDiagnostics { + + /** + * Threshold to report potential sparsity problems. + * + * Sparsity score is calculated: log(average) - log(current) + * + * If score is above the threshold, bucket is reported as sparse bucket. + */ + private static final int DATA_SPARSITY_THRESHOLD = 2; + + private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class); + + private final BucketHistogram bucketHistogram; + + private long bucketCount = 0; + private long emptyBucketCount = 0; + private long latestEmptyBucketTime = -1; + private long sparseBucketCount = 0; + private long latestSparseBucketTime = -1; + + public DataStreamDiagnostics(Job job) { + bucketHistogram = new BucketHistogram(job, createBucketFlushListener()); + } + + private BucketHistogram.BucketFlushListener createBucketFlushListener() { + return (flushedBucketStartMs, flushedBucketCount) -> { + ++bucketCount; + if (flushedBucketCount == 0) { + ++emptyBucketCount; + latestEmptyBucketTime = flushedBucketStartMs; + } else { + // simplistic way to calculate data sparsity, just take the log and + // check the difference + double averageBucketSize = bucketHistogram.averageBucketCount(); + double logAverageBucketSize = Math.log(averageBucketSize); + double logBucketSize = Math.log(flushedBucketCount); + double sparsityScore = logAverageBucketSize - logBucketSize; + + if (sparsityScore > DATA_SPARSITY_THRESHOLD) { + LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", flushedBucketStartMs, + flushedBucketCount, averageBucketSize, sparsityScore); + ++sparseBucketCount; + latestSparseBucketTime = flushedBucketStartMs; + } + } + }; + } + + /** + * Check record + * + * @param recordTimestampInMs + * The record timestamp in milliseconds since epoch + */ + public void checkRecord(long recordTimestampInMs) { + bucketHistogram.addRecord(recordTimestampInMs); + } + + /** + * Flush all counters, should be called at the end of the data stream + */ + public void flush() { + // flush all we know + bucketHistogram.flush(); + } + + public long getBucketCount() { + return bucketCount; + } + + public long getEmptyBucketCount() { + return emptyBucketCount; + } + + public Date getLatestEmptyBucketTime() { + return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null; + } + + public long getSparseBucketCount() { + return sparseBucketCount; + } + + public Date getLatestSparseBucketTime() { + return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null; + } + + /** + * Resets counts, + * + * Note: This does not reset the inner state for e.g. sparse bucket + * detection. + * + */ + public void resetCounts() { + bucketCount = 0; + emptyBucketCount = 0; + sparseBucketCount = 0; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java similarity index 93% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java index 2c167f0df82cf..65ae398bda79c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.job.process.diagnostics; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -13,7 +13,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.junit.Before; -import java.io.IOException; import java.util.Arrays; import java.util.Date; @@ -21,9 +20,9 @@ public class DataStreamDiagnosticsTests extends ESTestCase { private static final long BUCKET_SPAN = 60000; private Job job; - + @Before - public void setUpMocks() throws IOException { + public void setUpMocks() { AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); acBuilder.setBucketSpan(TimeValue.timeValueMillis(BUCKET_SPAN)); acBuilder.setLatency(TimeValue.ZERO); @@ -80,6 +79,7 @@ public void testIncompleteBuckets() { assertEquals(null, d.getLatestSparseBucketTime()); assertEquals(new Date(BUCKET_SPAN * 2), d.getLatestEmptyBucketTime()); } + public void testSimple() { DataStreamDiagnostics d = new DataStreamDiagnostics(job); @@ -102,6 +102,28 @@ public void testSimple() { assertEquals(null, d.getLatestEmptyBucketTime()); } + public void testSimpleReverse() { + DataStreamDiagnostics d = new DataStreamDiagnostics(job); + + d.checkRecord(610000); + d.checkRecord(550000); + d.checkRecord(490000); + d.checkRecord(430000); + d.checkRecord(370000); + d.checkRecord(310000); + d.checkRecord(250000); + d.checkRecord(190000); + d.checkRecord(130000); + d.checkRecord(70000); + + d.flush(); + assertEquals(9, d.getBucketCount()); + assertEquals(0, d.getEmptyBucketCount()); + assertEquals(0, d.getSparseBucketCount()); + assertEquals(null, d.getLatestSparseBucketTime()); + assertEquals(null, d.getLatestEmptyBucketTime()); + } + public void testEmptyBuckets() { DataStreamDiagnostics d = new DataStreamDiagnostics(job); @@ -280,7 +302,7 @@ public void testEmptyBucketsLongerOutage() { /** * Send signals, make a longer period of sparse signals, then go up again - * + * * The number of sparse buckets should not be to much, it could be normal. */ public void testSparseBucketsLongerPeriod() { From 5ea9de4b22085023efa1d7d82d0a4b8c1783d6a0 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 25 Apr 2018 16:36:40 +0100 Subject: [PATCH 2/4] Fix latency --- .../process/diagnostics/BucketHistogram.java | 2 +- .../DataStreamDiagnosticsTests.java | 46 ++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java index 2b5e5f340f1f5..0120134349fe1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java @@ -36,7 +36,7 @@ class BucketHistogram { public BucketHistogram(Job job, BucketFlushListener bucketFlushListener) { bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); - maxSize = Math.max((int) (latencyMs / bucketSpanMs), MIN_BUCKETS); + maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS); buckets = new long[maxSize]; this.bucketFlushListener = bucketFlushListener; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java index 65ae398bda79c..19f7f88c38fef 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java @@ -31,7 +31,7 @@ public void setUpMocks() { Job.Builder builder = new Job.Builder("job_id"); builder.setAnalysisConfig(acBuilder); builder.setDataDescription(new DataDescription.Builder()); - job = builder.build(new Date()); + job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null); } public void testIncompleteBuckets() { @@ -124,6 +124,36 @@ public void testSimpleReverse() { assertEquals(null, d.getLatestEmptyBucketTime()); } + public void testWithLatencyLessThanTenBuckets() { + job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN)); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); + + long timestamp = 70000; + while (timestamp < 70000 + 20 * BUCKET_SPAN) { + sendManyDataPoints(d, timestamp - BUCKET_SPAN, timestamp + timestamp, 100); + timestamp += BUCKET_SPAN; + } + + assertEquals(10, d.getBucketCount()); + d.flush(); + assertEquals(19, d.getBucketCount()); + } + + public void testWithLatencyGreaterThanTenBuckets() { + job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000)); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); + + long timestamp = 70000; + while (timestamp < 70000 + 20 * BUCKET_SPAN) { + sendManyDataPoints(d, timestamp - BUCKET_SPAN, timestamp + timestamp, 100); + timestamp += BUCKET_SPAN; + } + + assertEquals(6, d.getBucketCount()); + d.flush(); + assertEquals(19, d.getBucketCount()); + } + public void testEmptyBuckets() { DataStreamDiagnostics d = new DataStreamDiagnostics(job); @@ -329,6 +359,20 @@ public void testSparseBucketsLongerPeriod() { assertEquals(null, d.getLatestEmptyBucketTime()); } + private static Job createJob(TimeValue bucketSpan, TimeValue latency) { + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); + acBuilder.setBucketSpan(bucketSpan); + if (latency != null) { + acBuilder.setLatency(latency); + } + acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + + Job.Builder builder = new Job.Builder("job_id"); + builder.setAnalysisConfig(acBuilder); + builder.setDataDescription(new DataDescription.Builder()); + return builder.build(new Date()); + } + public void testFlushAfterZeroRecords() { DataStreamDiagnostics d = new DataStreamDiagnostics(job); d.flush(); From 3b74f002c6d85e5c2d68ab653b3b0538c2ea0e3d Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 25 Apr 2018 18:32:56 +0100 Subject: [PATCH 3/4] Rename BucketHistogram to BucketDiagnostics --- .../{BucketHistogram.java => BucketDiagnostics.java} | 4 ++-- .../process/diagnostics/DataStreamDiagnostics.java | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/{BucketHistogram.java => BucketDiagnostics.java} (97%) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java similarity index 97% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java index 0120134349fe1..3d95c5b8b3f2b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketHistogram.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java @@ -17,7 +17,7 @@ * circular buffer. When time is advanced, all buckets * out of the window are flushed. */ -class BucketHistogram { +class BucketDiagnostics { private static final int MIN_BUCKETS = 10; @@ -33,7 +33,7 @@ class BucketHistogram { private long latestFlushedBucketStartMs = -1; private final BucketFlushListener bucketFlushListener; - public BucketHistogram(Job job, BucketFlushListener bucketFlushListener) { + public BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java index ecad635752e6b..a19f6eba02367 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java @@ -24,7 +24,7 @@ public class DataStreamDiagnostics { private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class); - private final BucketHistogram bucketHistogram; + private final BucketDiagnostics bucketDiagnostics; private long bucketCount = 0; private long emptyBucketCount = 0; @@ -33,10 +33,10 @@ public class DataStreamDiagnostics { private long latestSparseBucketTime = -1; public DataStreamDiagnostics(Job job) { - bucketHistogram = new BucketHistogram(job, createBucketFlushListener()); + bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener()); } - private BucketHistogram.BucketFlushListener createBucketFlushListener() { + private BucketDiagnostics.BucketFlushListener createBucketFlushListener() { return (flushedBucketStartMs, flushedBucketCount) -> { ++bucketCount; if (flushedBucketCount == 0) { @@ -45,7 +45,7 @@ private BucketHistogram.BucketFlushListener createBucketFlushListener() { } else { // simplistic way to calculate data sparsity, just take the log and // check the difference - double averageBucketSize = bucketHistogram.averageBucketCount(); + double averageBucketSize = bucketDiagnostics.averageBucketCount(); double logAverageBucketSize = Math.log(averageBucketSize); double logBucketSize = Math.log(flushedBucketCount); double sparsityScore = logAverageBucketSize - logBucketSize; @@ -67,7 +67,7 @@ private BucketHistogram.BucketFlushListener createBucketFlushListener() { * The record timestamp in milliseconds since epoch */ public void checkRecord(long recordTimestampInMs) { - bucketHistogram.addRecord(recordTimestampInMs); + bucketDiagnostics.addRecord(recordTimestampInMs); } /** @@ -75,7 +75,7 @@ public void checkRecord(long recordTimestampInMs) { */ public void flush() { // flush all we know - bucketHistogram.flush(); + bucketDiagnostics.flush(); } public long getBucketCount() { From 09186bc9bea333123a107db05dcde05f8d46e5ce Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 30 Apr 2018 13:31:09 +0100 Subject: [PATCH 4/4] Remove public qualifiers from package private class --- .../ml/job/process/diagnostics/BucketDiagnostics.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java index 3d95c5b8b3f2b..c61926dfb0426 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java @@ -33,7 +33,7 @@ class BucketDiagnostics { private long latestFlushedBucketStartMs = -1; private final BucketFlushListener bucketFlushListener; - public BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { + BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS); @@ -41,7 +41,7 @@ public BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { this.bucketFlushListener = bucketFlushListener; } - public void addRecord(long recordTimestampMs) { + void addRecord(long recordTimestampMs) { long bucketStartMs = Intervals.alignToFloor(recordTimestampMs, bucketSpanMs); // Initialize earliest/latest times @@ -103,7 +103,7 @@ private long getTimestampMs(int bucketIndex) { return latestBucketStartMs - offsetToLatest * bucketSpanMs; } - public void flush() { + void flush() { if (latestBucketStartMs < 0) { return; } @@ -115,7 +115,7 @@ public void flush() { } } - public double averageBucketCount() { + double averageBucketCount() { return (double) movingBucketCount / size(); } @@ -126,7 +126,7 @@ private int size() { return (int) ((latestBucketStartMs - earliestBucketStartMs) / bucketSpanMs) + 1; } - public interface BucketFlushListener { + interface BucketFlushListener { void onBucketFlush(long bucketStartMs, long bucketCounts); } }