From a2aafb4e1739b2e24bcc87e54ea1c5e52da26a40 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 5 Jul 2018 10:28:43 +0200 Subject: [PATCH 1/2] add duration statistics for forecast stats --- x-pack/docs/en/rest-api/ml/jobcounts.asciidoc | 3 ++ .../xpack/core/ml/stats/ForecastStats.java | 16 +++++-- .../core/ml/stats/ForecastStatsTests.java | 48 ++++++++++++++++--- .../xpack/ml/job/persistence/JobProvider.java | 11 ++++- 4 files changed, 67 insertions(+), 11 deletions(-) diff --git a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc index d343cc23ae0ad..7590a302201a2 100644 --- a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc +++ b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc @@ -202,6 +202,9 @@ The `forecasts_stats` object shows statistics about forecasts. It has the follow `processing_time_ms`:: (object) Statistics about the forecast runtime in milliseconds: minimum, maximum, average and total. +`duration_ms`:: + (object) Statistics about the forecast duration in milliseconds: minimum, maximum, average and total. + `status`:: (object) Counts per forecast status, for example: {"finished" : 2}. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java index d490e4b98a44a..98e9dd51f2722 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java @@ -28,6 +28,7 @@ public static class Fields { public static final String MEMORY = "memory_bytes"; public static final String RUNTIME = "processing_time_ms"; public static final String RECORDS = "records"; + public static final String DURATION = "duration_ms"; public static final String STATUSES = "status"; } @@ -36,6 +37,7 @@ public static class Fields { private StatsAccumulator memoryStats; private StatsAccumulator recordStats; private StatsAccumulator runtimeStats; + private StatsAccumulator durationStats; private CountAccumulator statusCounts; public ForecastStats() { @@ -44,6 +46,7 @@ public ForecastStats() { this.memoryStats = new StatsAccumulator(); this.recordStats = new StatsAccumulator(); this.runtimeStats = new StatsAccumulator(); + this.durationStats = new StatsAccumulator(); this.statusCounts = new CountAccumulator(); } @@ -51,12 +54,13 @@ public ForecastStats() { * Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it. */ public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats, - CountAccumulator statusCounts) { + StatsAccumulator durationStats, CountAccumulator statusCounts) { this.total = total; this.forecastedJobs = total > 0 ? 1 : 0; this.memoryStats = Objects.requireNonNull(memoryStats); this.recordStats = Objects.requireNonNull(recordStats); this.runtimeStats = Objects.requireNonNull(runtimeStats); + this.durationStats = Objects.requireNonNull(durationStats); this.statusCounts = Objects.requireNonNull(statusCounts); } @@ -66,6 +70,7 @@ public ForecastStats(StreamInput in) throws IOException { this.memoryStats = new StatsAccumulator(in); this.recordStats = new StatsAccumulator(in); this.runtimeStats = new StatsAccumulator(in); + this.durationStats = new StatsAccumulator(in); this.statusCounts = new CountAccumulator(in); } @@ -78,6 +83,7 @@ public ForecastStats merge(ForecastStats other) { memoryStats.merge(other.memoryStats); recordStats.merge(other.recordStats); runtimeStats.merge(other.runtimeStats); + durationStats.merge(other.durationStats); statusCounts.merge(other.statusCounts); return this; @@ -98,6 +104,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th builder.field(Fields.MEMORY, memoryStats.asMap()); builder.field(Fields.RECORDS, recordStats.asMap()); builder.field(Fields.RUNTIME, runtimeStats.asMap()); + builder.field(Fields.DURATION, durationStats.asMap()); builder.field(Fields.STATUSES, statusCounts.asMap()); } @@ -113,6 +120,7 @@ public Map asMap() { map.put(Fields.MEMORY, memoryStats.asMap()); map.put(Fields.RECORDS, recordStats.asMap()); map.put(Fields.RUNTIME, runtimeStats.asMap()); + map.put(Fields.DURATION, durationStats.asMap()); map.put(Fields.STATUSES, statusCounts.asMap()); } @@ -126,12 +134,13 @@ public void writeTo(StreamOutput out) throws IOException { memoryStats.writeTo(out); recordStats.writeTo(out); runtimeStats.writeTo(out); + durationStats.writeTo(out); statusCounts.writeTo(out); } @Override public int hashCode() { - return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts); + return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, durationStats, statusCounts); } @Override @@ -147,6 +156,7 @@ public boolean equals(Object obj) { ForecastStats other = (ForecastStats) obj; return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs) && Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats) - && Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts); + && Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(durationStats, other.durationStats) + && Objects.equals(statusCounts, other.statusCounts); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java index f7f5d16c5e578..81b4d83737815 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java @@ -36,6 +36,7 @@ public void testEmpty() throws IOException { assertFalse(properties.containsKey(Fields.RECORDS)); assertFalse(properties.containsKey(Fields.RUNTIME)); assertFalse(properties.containsKey(Fields.STATUSES)); + assertFalse(properties.containsKey(Fields.DURATION)); } public void testMerge() { @@ -58,7 +59,12 @@ public void testMerge() { statusStats.add("finished", 2L); statusStats.add("failed", 5L); - ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats); + StatsAccumulator durationStats = new StatsAccumulator(); + durationStats.add(96); + durationStats.add(192); + durationStats.add(96); + + ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, durationStats, statusStats); StatsAccumulator memoryStats2 = new StatsAccumulator(); memoryStats2.add(10); @@ -76,7 +82,11 @@ public void testMerge() { statusStats2.add("finished", 2L); statusStats2.add("scheduled", 1L); - ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2); + StatsAccumulator durationStats2 = new StatsAccumulator(); + durationStats2.add(192); + durationStats2.add(192); + + ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, durationStats2, statusStats2); forecastStats.merge(forecastStats2); @@ -117,6 +127,14 @@ public void testMerge() { assertEquals(4, mergedCountStats.get("finished").longValue()); assertEquals(5, mergedCountStats.get("failed").longValue()); assertEquals(1, mergedCountStats.get("scheduled").longValue()); + + @SuppressWarnings("unchecked") + Map mergedDurationStats = (Map) mergedStats.get(Fields.DURATION); + + assertTrue(mergedDurationStats != null); + assertThat(mergedDurationStats.get(StatsAccumulator.Fields.AVG), equalTo(153.6)); + assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MAX), equalTo(192.0)); + assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MIN), equalTo(96.0)); } public void testChainedMerge() { @@ -135,7 +153,12 @@ public void testChainedMerge() { CountAccumulator statusStats = new CountAccumulator(); statusStats.add("finished", 2L); statusStats.add("failed", 5L); - ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats); + StatsAccumulator durationStats = new StatsAccumulator(); + durationStats.add(96); + durationStats.add(192); + durationStats.add(96); + + ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, durationStats, statusStats); StatsAccumulator memoryStats2 = new StatsAccumulator(); memoryStats2.add(10); @@ -149,7 +172,10 @@ public void testChainedMerge() { CountAccumulator statusStats2 = new CountAccumulator(); statusStats2.add("finished", 2L); statusStats2.add("scheduled", 1L); - ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2); + StatsAccumulator durationStats2 = new StatsAccumulator(); + durationStats2.add(192); + durationStats2.add(192); + ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, durationStats2, statusStats2); StatsAccumulator memoryStats3 = new StatsAccumulator(); memoryStats3.add(500); @@ -159,7 +185,9 @@ public void testChainedMerge() { runtimeStats3.add(32); CountAccumulator statusStats3 = new CountAccumulator(); statusStats3.add("finished", 1L); - ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, statusStats3); + StatsAccumulator durationStats3 = new StatsAccumulator(); + durationStats3.add(282); + ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, durationStats3, statusStats3); ForecastStats forecastStats4 = new ForecastStats(); @@ -209,6 +237,14 @@ public void testChainedMerge() { assertEquals(5, mergedCountStats.get("finished").longValue()); assertEquals(5, mergedCountStats.get("failed").longValue()); assertEquals(1, mergedCountStats.get("scheduled").longValue()); + + @SuppressWarnings("unchecked") + Map mergedDurationStats = (Map) mergedStats.get(Fields.DURATION); + + assertTrue(mergedDurationStats != null); + assertThat(mergedDurationStats.get(StatsAccumulator.Fields.AVG), equalTo(175.0)); + assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MAX), equalTo(282.0)); + assertThat(mergedDurationStats.get(StatsAccumulator.Fields.MIN), equalTo(96.0)); } public void testUniqueCountOfJobs() { @@ -238,7 +274,7 @@ protected Reader instanceReader() { public ForecastStats createForecastStats(long minTotal, long maxTotal) { ForecastStats forecastStats = new ForecastStats(randomLongBetween(minTotal, maxTotal), createStatsAccumulator(), - createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator()); + createStatsAccumulator(), createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator()); return forecastStats; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 7513cb5a5bbc0..a3f5185a03493 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -59,6 +59,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; @@ -98,6 +99,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.core.ml.stats.CountAccumulator; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats.Fields; import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; @@ -1135,6 +1137,9 @@ public void getForecastStats(String jobId, Consumer handler, Cons .field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); sourceBuilder.aggregation( AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); + Script durationScript = new Script( + "doc['forecast_end_timestamp'].value.getMillis() - doc['forecast_start_timestamp'].value.getMillis()"); + sourceBuilder.aggregation(AggregationBuilders.stats(Fields.DURATION).script(durationScript)); sourceBuilder.aggregation( AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName())); sourceBuilder.size(0); @@ -1156,10 +1161,12 @@ public void getForecastStats(String jobId, Consumer handler, Cons .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RECORDS)); StatsAccumulator runtimeStats = StatsAccumulator .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RUNTIME)); + StatsAccumulator durationStats = StatsAccumulator + .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.DURATION)); CountAccumulator statusCount = CountAccumulator .fromTermsAggregation((StringTerms) aggregationsAsMap.get(ForecastStats.Fields.STATUSES)); - - ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, runtimeStats, statusCount); + ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, runtimeStats, durationStats, + statusCount); handler.accept(forecastStats); }, errorHandler), client::search); From da247ff72d2cdd04b7705c961267a6fac84dcbf7 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 5 Jul 2018 10:48:02 +0200 Subject: [PATCH 2/2] use constants --- .../elasticsearch/xpack/ml/job/persistence/JobProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index a3f5185a03493..3a8f187fde645 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -1137,8 +1137,8 @@ public void getForecastStats(String jobId, Consumer handler, Cons .field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); sourceBuilder.aggregation( AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); - Script durationScript = new Script( - "doc['forecast_end_timestamp'].value.getMillis() - doc['forecast_start_timestamp'].value.getMillis()"); + Script durationScript = new Script("doc['" + ForecastRequestStats.END_TIME + "'].value.getMillis() - doc['" + + ForecastRequestStats.START_TIME + "'].value.getMillis()"); sourceBuilder.aggregation(AggregationBuilders.stats(Fields.DURATION).script(durationScript)); sourceBuilder.aggregation( AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName()));