From 11f33d525ad090a738357008f9a9537b439466c7 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Fri, 26 Jul 2019 15:14:20 +0200 Subject: [PATCH 1/2] Persist DatafeedTiminStats with RefreshPolicy.NONE by default --- .../TransportPreviewDatafeedAction.java | 9 +- .../action/TransportStartDatafeedAction.java | 10 +- .../xpack/ml/datafeed/DatafeedJob.java | 4 + .../xpack/ml/datafeed/DatafeedJobBuilder.java | 12 +- .../xpack/ml/datafeed/DatafeedManager.java | 1 + .../datafeed/DatafeedTimingStatsReporter.java | 29 ++-- .../DatafeedTimingStatsReporterTests.java | 133 +++++++++++------- .../AggregationDataExtractorTests.java | 4 +- .../chunked/ChunkedDataExtractorTests.java | 4 +- .../scroll/ScrollDataExtractorTests.java | 4 +- .../persistence/TimingStatsReporterTests.java | 13 +- 11 files changed, 137 insertions(+), 86 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index d3ba2db506a95..ded682d2230be 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -21,10 +21,10 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import java.io.BufferedReader; @@ -42,21 +42,19 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction() { @Override public void onResponse(DataExtractorFactory dataExtractorFactory) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3d310423866d5..8a3f2e35e1dff 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -53,10 +53,10 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.IOException; @@ -83,7 +83,6 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction persistentTasksService.sendStartRequest( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 18d724313e325..8837d3f03f9ec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -107,6 +107,10 @@ public String getJobId() { return jobId; } + public void finishReportingTimingStats() { + timingStatsReporter.finishReporting(); + } + Long runLookBack(long startTime, Long endTime) throws Exception { lookbackStartTimeMs = skipToStartTime(startTime); Optional endMs = Optional.ofNullable(endTime); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index 778e211640279..b94a0d1f41910 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -7,6 +7,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -19,6 +20,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -101,8 +103,14 @@ void build(String datafeedId, ActionListener listener) { ); // Create data extractor factory - Consumer datafeedTimingStatsHandler = timingStats -> { - context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister); + Consumer datafeedTimingStatsHandler = initialTimingStats -> { + DatafeedTimingStatsPersister timingStatsPersister = new DatafeedTimingStatsPersister() { + @Override + public void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) { + jobResultsPersister.persistDatafeedTimingStats(timingStats, refreshPolicy); + } + }; + context.timingStatsReporter = new DatafeedTimingStatsReporter(initialTimingStats, timingStatsPersister); DataExtractorFactory.create( client, datafeedConfigHolder.get(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 53568c3705a8d..4425b624a06fe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -347,6 +347,7 @@ public void stop(String source, TimeValue timeout, Exception e) { } auditor.info(datafeedJob.getJobId(), Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED)); + datafeedJob.finishReportingTimingStats(); finishHandler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java index 7df3919c459b3..0994565ffde52 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java @@ -9,7 +9,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import java.util.Objects; @@ -21,20 +20,27 @@ */ public class DatafeedTimingStatsReporter { + /** Interface used for persisting current timing stats to the results index. */ + public interface DatafeedTimingStatsPersister { + /** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */ + default void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) {} + } + /** Persisted timing stats. May be stale. */ private DatafeedTimingStats persistedTimingStats; /** Current timing stats. */ private volatile DatafeedTimingStats currentTimingStats; /** Object used to persist current timing stats. */ - private final JobResultsPersister jobResultsPersister; + private final DatafeedTimingStatsPersister persister; - public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, JobResultsPersister jobResultsPersister) { + public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) { Objects.requireNonNull(timingStats); this.persistedTimingStats = new DatafeedTimingStats(timingStats); this.currentTimingStats = new DatafeedTimingStats(timingStats); - this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister); + this.persister = Objects.requireNonNull(persister); } + /** Gets current timing stats. */ public DatafeedTimingStats getCurrentTimingStats() { return new DatafeedTimingStats(currentTimingStats); } @@ -64,16 +70,23 @@ public void reportDataCounts(DataCounts dataCounts) { flushIfDifferSignificantly(); } + /** Finishes reporting of timing stats. Makes timing stats persisted immediately. */ + public void finishReporting() { + // Don't flush if current timing stats are identical to the persisted ones + if (currentTimingStats.equals(persistedTimingStats) == false) { + flush(WriteRequest.RefreshPolicy.IMMEDIATE); + } + } + private void flushIfDifferSignificantly() { if (differSignificantly(currentTimingStats, persistedTimingStats)) { - flush(); + flush(WriteRequest.RefreshPolicy.NONE); } } - private void flush() { + private void flush(WriteRequest.RefreshPolicy refreshPolicy) { persistedTimingStats = new DatafeedTimingStats(currentTimingStats); - // TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action - jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE); + persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java index 6daa0f5a0b842..fff735418be5c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java @@ -11,10 +11,13 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.junit.Before; import org.mockito.InOrder; +import java.sql.Date; +import java.time.Instant; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.inOrder; @@ -26,93 +29,107 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase { private static final String JOB_ID = "my-job-id"; + private static final Instant TIMESTAMP = Instant.ofEpochMilli(1000000000); private static final TimeValue ONE_SECOND = TimeValue.timeValueSeconds(1); - private JobResultsPersister jobResultsPersister; + private DatafeedTimingStatsPersister timingStatsPersister; @Before public void setUpTests() { - jobResultsPersister = mock(JobResultsPersister.class); + timingStatsPersister = mock(DatafeedTimingStatsPersister.class); } public void testReportSearchDuration_Null() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - timingStatsReporter.reportSearchDuration(null); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + reporter.reportSearchDuration(null); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - verifyZeroInteractions(jobResultsPersister); + verifyZeroInteractions(timingStatsPersister); } public void testReportSearchDuration_Zero() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0))); - timingStatsReporter.reportSearchDuration(TimeValue.ZERO); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0))); + reporter.reportSearchDuration(TimeValue.ZERO); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0))); - verify(jobResultsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.IMMEDIATE); - verifyNoMoreInteractions(jobResultsPersister); + verify(timingStatsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.NONE); + verifyNoMoreInteractions(timingStatsPersister); } public void testReportSearchDuration() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 14, 10, 11000.0, 11000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 14, 10, 11000.0, 11000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 16, 10, 13000.0, 13000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 16, 10, 13000.0, 13000.0))); - timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0))); + reporter.reportSearchDuration(ONE_SECOND); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0))); - InOrder inOrder = inOrder(jobResultsPersister); - inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.IMMEDIATE); - inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.IMMEDIATE); - verifyNoMoreInteractions(jobResultsPersister); + InOrder inOrder = inOrder(timingStatsPersister); + inOrder.verify(timingStatsPersister).persistDatafeedTimingStats( + createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.NONE); + inOrder.verify(timingStatsPersister).persistDatafeedTimingStats( + createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.NONE); + verifyNoMoreInteractions(timingStatsPersister); } public void testReportDataCounts_Null() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - timingStatsReporter.reportDataCounts(null); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + reporter.reportDataCounts(null); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); - verifyZeroInteractions(jobResultsPersister); + verifyZeroInteractions(timingStatsPersister); } public void testReportDataCounts() { - DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0))); + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0))); - timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 21, 10000.0))); + reporter.reportDataCounts(createDataCounts(1)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 21, 10000.0))); - timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 22, 10000.0))); + reporter.reportDataCounts(createDataCounts(1)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 22, 10000.0))); - timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0))); + reporter.reportDataCounts(createDataCounts(1)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0))); - InOrder inOrder = inOrder(jobResultsPersister); - inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE); - verifyNoMoreInteractions(jobResultsPersister); + InOrder inOrder = inOrder(timingStatsPersister); + inOrder.verify(timingStatsPersister).persistDatafeedTimingStats( + createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.NONE); + verifyNoMoreInteractions(timingStatsPersister); } - private static DataCounts createDataCountsWithBucketCount(long bucketCount) { - DataCounts dataCounts = new DataCounts(JOB_ID); - dataCounts.incrementBucketCount(bucketCount); - return dataCounts; + public void testFinishReporting_NoChange() { + DatafeedTimingStatsReporter reporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + reporter.reportDataCounts(createDataCounts(0)); + reporter.finishReporting(); + + verifyZeroInteractions(timingStatsPersister); + } + + public void testFinishReporting_WithChange() { + DatafeedTimingStatsReporter reporter = createReporter(new DatafeedTimingStats(JOB_ID)); + reporter.reportDataCounts(createDataCounts(0, TIMESTAMP)); + reporter.finishReporting(); + + verify(timingStatsPersister).persistDatafeedTimingStats( + new DatafeedTimingStats(JOB_ID, 0, 0, 0.0, new ExponentialAverageCalculationContext(0.0, TIMESTAMP, null)), + RefreshPolicy.IMMEDIATE); + verifyNoMoreInteractions(timingStatsPersister); } public void testTimingStatsDifferSignificantly() { @@ -151,7 +168,7 @@ public void testTimingStatsDifferSignificantly() { } private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) { - return new DatafeedTimingStatsReporter(timingStats, jobResultsPersister); + return new DatafeedTimingStatsReporter(timingStats, timingStatsPersister); } private static DatafeedTimingStats createDatafeedTimingStats( @@ -171,4 +188,16 @@ private static DatafeedTimingStats createDatafeedTimingStats( ExponentialAverageCalculationContext context = new ExponentialAverageCalculationContext(incrementalSearchTimeMs, null, null); return new DatafeedTimingStats(jobId, searchCount, bucketCount, totalSearchTimeMs, context); } + + private static DataCounts createDataCounts(long bucketCount, Instant latestRecordTimestamp) { + DataCounts dataCounts = createDataCounts(bucketCount); + dataCounts.setLatestRecordTimeStamp(Date.from(latestRecordTimestamp)); + return dataCounts; + } + + private static DataCounts createDataCounts(long bucketCount) { + DataCounts dataCounts = new DataCounts(JOB_ID); + dataCounts.incrementBucketCount(bucketCount); + return dataCounts; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 6d9db043755ad..537a6b44d0bc6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -20,8 +20,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import java.io.BufferedReader; @@ -93,7 +93,7 @@ public void setUpTests() { .addAggregator(AggregationBuilders.histogram("time").field("time").interval(1000).subAggregation( AggregationBuilders.terms("airline").field("airline").subAggregation( AggregationBuilders.avg("responsetime").field("responsetime")))); - timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class)); + timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class)); } public void testExtraction() throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index 77ebc9651ddd7..e7ac362543945 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -26,8 +26,8 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import java.io.IOException; @@ -93,7 +93,7 @@ public void setUpTests() { scrollSize = 1000; chunkSpan = null; dataExtractorFactory = mock(DataExtractorFactory.class); - timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class)); + timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class)); } public void testExtractionGivenNoData() throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index f4074b0f5b46d..c383cf20b187e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -31,9 +31,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -146,7 +146,7 @@ public void setUpTests() { clearScrollFuture = mock(ActionFuture.class); capturedClearScrollRequests = ArgumentCaptor.forClass(ClearScrollRequest.class); when(client.execute(same(ClearScrollAction.INSTANCE), capturedClearScrollRequests.capture())).thenReturn(clearScrollFuture); - timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class)); + timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class)); } public void testSinglePageExtraction() throws IOException { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java index 9e1e5646e115d..d8a4d5aec24b2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; public class TimingStatsReporterTests extends ESTestCase { @@ -61,7 +62,7 @@ public void testReporting() { InOrder inOrder = inOrder(bulkResultsPersister); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1, 30.0)); - inOrder.verifyNoMoreInteractions(); + verifyNoMoreInteractions(bulkResultsPersister); } public void testFinishReporting() { @@ -83,25 +84,23 @@ public void testFinishReporting() { InOrder inOrder = inOrder(bulkResultsPersister); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0, 30.0)); - inOrder.verifyNoMoreInteractions(); + verifyNoMoreInteractions(bulkResultsPersister); } - public void testFinishReportingNoChange() { + public void testFinishReporting_NoChange() { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); - reporter.finishReporting(); verifyZeroInteractions(bulkResultsPersister); } - public void testFinishReportingWithChange() { + public void testFinishReporting_WithChange() { TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); - reporter.reportBucket(createBucket(10)); - reporter.finishReporting(); verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); + verifyNoMoreInteractions(bulkResultsPersister); } public void testTimingStatsDifferSignificantly() { From 5b6f65323b969e1f487c9f62fe434b88c8d05630 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Thu, 1 Aug 2019 09:58:14 +0200 Subject: [PATCH 2/2] Apply review comments --- .../ml/action/TransportPreviewDatafeedAction.java | 3 +-- .../xpack/ml/action/TransportStartDatafeedAction.java | 3 +-- .../xpack/ml/datafeed/DatafeedJobBuilder.java | 11 ++--------- .../ml/datafeed/DatafeedTimingStatsReporter.java | 3 ++- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index ded682d2230be..e7e182b15c11d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -21,7 +21,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; -import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -82,7 +81,7 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio jobBuilder.build(), xContentRegistry, // Fake DatafeedTimingStatsReporter that does not have access to results index - new DatafeedTimingStatsReporter(timingStats, new DatafeedTimingStatsPersister() {}), + new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}), new ActionListener<>() { @Override public void onResponse(DataExtractorFactory dataExtractorFactory) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 8a3f2e35e1dff..16f311d2b1666 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -53,7 +53,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; -import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -250,7 +249,7 @@ private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeed job, xContentRegistry, // Fake DatafeedTimingStatsReporter that does not have access to results index - new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), new DatafeedTimingStatsPersister() {}), + new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), (ts, refreshPolicy) -> {}), ActionListener.wrap( unused -> persistentTasksService.sendStartRequest( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index b94a0d1f41910..a592f41022654 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -7,7 +7,6 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -20,7 +19,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Result; -import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -104,13 +102,8 @@ void build(String datafeedId, ActionListener listener) { // Create data extractor factory Consumer datafeedTimingStatsHandler = initialTimingStats -> { - DatafeedTimingStatsPersister timingStatsPersister = new DatafeedTimingStatsPersister() { - @Override - public void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) { - jobResultsPersister.persistDatafeedTimingStats(timingStats, refreshPolicy); - } - }; - context.timingStatsReporter = new DatafeedTimingStatsReporter(initialTimingStats, timingStatsPersister); + context.timingStatsReporter = + new DatafeedTimingStatsReporter(initialTimingStats, jobResultsPersister::persistDatafeedTimingStats); DataExtractorFactory.create( client, datafeedConfigHolder.get(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java index 0994565ffde52..283b667f7b86d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java @@ -21,9 +21,10 @@ public class DatafeedTimingStatsReporter { /** Interface used for persisting current timing stats to the results index. */ + @FunctionalInterface public interface DatafeedTimingStatsPersister { /** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */ - default void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) {} + void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy); } /** Persisted timing stats. May be stale. */