diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java index 9f9215e5046fe..4e0c67ffd0f89 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java @@ -39,6 +39,7 @@ public class DatafeedTimingStats implements ToXContentObject { public static final ParseField BUCKET_COUNT = new ParseField("bucket_count"); public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms"); public static final ParseField AVG_SEARCH_TIME_PER_BUCKET_MS = new ParseField("average_search_time_per_bucket_ms"); + public static final ParseField EXPONENTIAL_AVG_SEARCH_TIME_PER_HOUR_MS = new ParseField("exponential_average_search_time_per_hour_ms"); public static final ParseField TYPE = new ParseField("datafeed_timing_stats"); @@ -55,18 +56,21 @@ private static ConstructingObjectParser createParser( Long bucketCount = (Long) args[2]; Double totalSearchTimeMs = (Double) args[3]; Double avgSearchTimePerBucketMs = (Double) args[4]; + Double exponentialAvgSearchTimePerHourMs = (Double) args[5]; return new DatafeedTimingStats( jobId, getOrDefault(searchCount, 0L), getOrDefault(bucketCount, 0L), getOrDefault(totalSearchTimeMs, 0.0), - avgSearchTimePerBucketMs); + avgSearchTimePerBucketMs, + exponentialAvgSearchTimePerHourMs); }); parser.declareString(constructorArg(), JOB_ID); parser.declareLong(optionalConstructorArg(), SEARCH_COUNT); parser.declareLong(optionalConstructorArg(), BUCKET_COUNT); parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS); parser.declareDouble(optionalConstructorArg(), AVG_SEARCH_TIME_PER_BUCKET_MS); + parser.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_SEARCH_TIME_PER_HOUR_MS); return parser; } @@ -75,14 +79,21 @@ private static ConstructingObjectParser createParser( private long bucketCount; private double totalSearchTimeMs; private Double avgSearchTimePerBucketMs; + private Double exponentialAvgSearchTimePerHourMs; public DatafeedTimingStats( - String jobId, long searchCount, long bucketCount, double totalSearchTimeMs, @Nullable Double avgSearchTimePerBucketMs) { + String jobId, + long searchCount, + long bucketCount, + double totalSearchTimeMs, + @Nullable Double avgSearchTimePerBucketMs, + @Nullable Double exponentialAvgSearchTimePerHourMs) { this.jobId = Objects.requireNonNull(jobId); this.searchCount = searchCount; this.bucketCount = bucketCount; this.totalSearchTimeMs = totalSearchTimeMs; this.avgSearchTimePerBucketMs = avgSearchTimePerBucketMs; + this.exponentialAvgSearchTimePerHourMs = exponentialAvgSearchTimePerHourMs; } public String getJobId() { @@ -105,6 +116,10 @@ public Double getAvgSearchTimePerBucketMs() { return avgSearchTimePerBucketMs; } + public Double getExponentialAvgSearchTimePerHourMs() { + return exponentialAvgSearchTimePerHourMs; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); @@ -115,6 +130,9 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par if (avgSearchTimePerBucketMs != null) { builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), avgSearchTimePerBucketMs); } + if (exponentialAvgSearchTimePerHourMs != null) { + builder.field(EXPONENTIAL_AVG_SEARCH_TIME_PER_HOUR_MS.getPreferredName(), exponentialAvgSearchTimePerHourMs); + } builder.endObject(); return builder; } @@ -133,12 +151,19 @@ public boolean equals(Object obj) { && this.searchCount == other.searchCount && this.bucketCount == other.bucketCount && this.totalSearchTimeMs == other.totalSearchTimeMs - && Objects.equals(this.avgSearchTimePerBucketMs, other.avgSearchTimePerBucketMs); + && Objects.equals(this.avgSearchTimePerBucketMs, other.avgSearchTimePerBucketMs) + && Objects.equals(this.exponentialAvgSearchTimePerHourMs, other.exponentialAvgSearchTimePerHourMs); } @Override public int hashCode() { - return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs, avgSearchTimePerBucketMs); + return Objects.hash( + jobId, + searchCount, + bucketCount, + totalSearchTimeMs, + avgSearchTimePerBucketMs, + exponentialAvgSearchTimePerHourMs); } @Override diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java index 9493270c4b936..7e722ddc8273d 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java @@ -45,6 +45,8 @@ public class TimingStats implements ToXContentObject { public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms"); public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("exponential_average_bucket_processing_time_ms"); + public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_PER_HOUR_MS = + new ParseField("exponential_average_bucket_processing_time_per_hour_ms"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( @@ -58,6 +60,7 @@ public class TimingStats implements ToXContentObject { Double maxBucketProcessingTimeMs = (Double) args[4]; Double avgBucketProcessingTimeMs = (Double) args[5]; Double exponentialAvgBucketProcessingTimeMs = (Double) args[6]; + Double exponentialAvgBucketProcessingTimePerHourMs = (Double) args[7]; return new TimingStats( jobId, getOrDefault(bucketCount, 0L), @@ -65,7 +68,8 @@ public class TimingStats implements ToXContentObject { minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs, - exponentialAvgBucketProcessingTimeMs); + exponentialAvgBucketProcessingTimeMs, + exponentialAvgBucketProcessingTimePerHourMs); }); static { @@ -76,6 +80,7 @@ public class TimingStats implements ToXContentObject { PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS); PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS); PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS); + PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_PER_HOUR_MS); } private final String jobId; @@ -85,6 +90,7 @@ public class TimingStats implements ToXContentObject { private Double maxBucketProcessingTimeMs; private Double avgBucketProcessingTimeMs; private Double exponentialAvgBucketProcessingTimeMs; + private Double exponentialAvgBucketProcessingTimePerHourMs; public TimingStats( String jobId, @@ -93,7 +99,8 @@ public TimingStats( @Nullable Double minBucketProcessingTimeMs, @Nullable Double maxBucketProcessingTimeMs, @Nullable Double avgBucketProcessingTimeMs, - @Nullable Double exponentialAvgBucketProcessingTimeMs) { + @Nullable Double exponentialAvgBucketProcessingTimeMs, + @Nullable Double exponentialAvgBucketProcessingTimePerHourMs) { this.jobId = jobId; this.bucketCount = bucketCount; this.totalBucketProcessingTimeMs = totalBucketProcessingTimeMs; @@ -101,6 +108,7 @@ public TimingStats( this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs; this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs; this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs; + this.exponentialAvgBucketProcessingTimePerHourMs = exponentialAvgBucketProcessingTimePerHourMs; } public String getJobId() { @@ -131,6 +139,10 @@ public Double getExponentialAvgBucketProcessingTimeMs() { return exponentialAvgBucketProcessingTimeMs; } + public Double getExponentialAvgBucketProcessingTimePerHourMs() { + return exponentialAvgBucketProcessingTimePerHourMs; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); @@ -149,6 +161,10 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par if (exponentialAvgBucketProcessingTimeMs != null) { builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs); } + if (exponentialAvgBucketProcessingTimePerHourMs != null) { + builder.field( + EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_PER_HOUR_MS.getPreferredName(), exponentialAvgBucketProcessingTimePerHourMs); + } builder.endObject(); return builder; } @@ -164,7 +180,8 @@ public boolean equals(Object o) { && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs) && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs) && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs) - && Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs); + && Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs) + && Objects.equals(this.exponentialAvgBucketProcessingTimePerHourMs, that.exponentialAvgBucketProcessingTimePerHourMs); } @Override @@ -176,7 +193,8 @@ public int hashCode() { minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs, - exponentialAvgBucketProcessingTimeMs); + exponentialAvgBucketProcessingTimeMs, + exponentialAvgBucketProcessingTimePerHourMs); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java index cde92b78f6c16..bf3c9581890c3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java @@ -35,7 +35,12 @@ public class DatafeedTimingStatsTests extends AbstractXContentTestCase createParser( Long searchCount = (Long) args[1]; Long bucketCount = (Long) args[2]; Double totalSearchTimeMs = (Double) args[3]; + ExponentialAverageCalculationContext exponentialAvgCalculationContext = (ExponentialAverageCalculationContext) args[4]; return new DatafeedTimingStats( - jobId, getOrDefault(searchCount, 0L), getOrDefault(bucketCount, 0L), getOrDefault(totalSearchTimeMs, 0.0)); + jobId, + getOrDefault(searchCount, 0L), + getOrDefault(bucketCount, 0L), + getOrDefault(totalSearchTimeMs, 0.0), + getOrDefault(exponentialAvgCalculationContext, new ExponentialAverageCalculationContext())); }); parser.declareString(constructorArg(), JOB_ID); parser.declareLong(optionalConstructorArg(), SEARCH_COUNT); parser.declareLong(optionalConstructorArg(), BUCKET_COUNT); parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS); + parser.declareObject(optionalConstructorArg(), ExponentialAverageCalculationContext.PARSER, EXPONENTIAL_AVG_CALCULATION_CONTEXT); return parser; } @@ -64,27 +74,40 @@ public static String documentId(String jobId) { private long searchCount; private long bucketCount; private double totalSearchTimeMs; - - public DatafeedTimingStats(String jobId, long searchCount, long bucketCount, double totalSearchTimeMs) { + private final ExponentialAverageCalculationContext exponentialAvgCalculationContext; + + public DatafeedTimingStats( + String jobId, + long searchCount, + long bucketCount, + double totalSearchTimeMs, + ExponentialAverageCalculationContext exponentialAvgCalculationContext) { this.jobId = Objects.requireNonNull(jobId); this.searchCount = searchCount; this.bucketCount = bucketCount; this.totalSearchTimeMs = totalSearchTimeMs; + this.exponentialAvgCalculationContext = Objects.requireNonNull(exponentialAvgCalculationContext); } public DatafeedTimingStats(String jobId) { - this(jobId, 0, 0, 0.0); + this(jobId, 0, 0, 0.0, new ExponentialAverageCalculationContext()); } public DatafeedTimingStats(StreamInput in) throws IOException { - jobId = in.readString(); - searchCount = in.readLong(); - bucketCount = in.readLong(); - totalSearchTimeMs = in.readDouble(); + this.jobId = in.readString(); + this.searchCount = in.readLong(); + this.bucketCount = in.readLong(); + this.totalSearchTimeMs = in.readDouble(); + this.exponentialAvgCalculationContext = in.readOptionalWriteable(ExponentialAverageCalculationContext::new); } public DatafeedTimingStats(DatafeedTimingStats other) { - this(other.jobId, other.searchCount, other.bucketCount, other.totalSearchTimeMs); + this( + other.jobId, + other.searchCount, + other.bucketCount, + other.totalSearchTimeMs, + new ExponentialAverageCalculationContext(other.exponentialAvgCalculationContext)); } public String getJobId() { @@ -104,26 +127,40 @@ public double getTotalSearchTimeMs() { } public Double getAvgSearchTimePerBucketMs() { - return bucketCount > 0 - ? totalSearchTimeMs / bucketCount - : null; + if (bucketCount == 0) return null; + return totalSearchTimeMs / bucketCount; + } + + public Double getExponentialAvgSearchTimePerHourMs() { + return exponentialAvgCalculationContext.getCurrentExponentialAverageMs(); + } + + // Visible for testing + ExponentialAverageCalculationContext getExponentialAvgCalculationContext() { + return exponentialAvgCalculationContext; } - public void incrementTotalSearchTimeMs(double searchTimeMs) { + public void incrementSearchTimeMs(double searchTimeMs) { this.searchCount++; this.totalSearchTimeMs += searchTimeMs; + this.exponentialAvgCalculationContext.increment(searchTimeMs); } public void incrementBucketCount(long bucketCount) { this.bucketCount += bucketCount; } + public void setLatestRecordTimestamp(Instant latestRecordTimestamp) { + this.exponentialAvgCalculationContext.setLatestTimestamp(latestRecordTimestamp); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(jobId); out.writeLong(searchCount); out.writeLong(bucketCount); out.writeDouble(totalSearchTimeMs); + out.writeOptionalWriteable(exponentialAvgCalculationContext); } @Override @@ -137,11 +174,18 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs); if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) { - Double avgSearchTimePerBucket = getAvgSearchTimePerBucketMs(); - if (avgSearchTimePerBucket != null) { - builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), getAvgSearchTimePerBucketMs()); + Double avgSearchTimePerBucketMs = getAvgSearchTimePerBucketMs(); + if (avgSearchTimePerBucketMs != null) { + builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), avgSearchTimePerBucketMs); + } + Double expAvgSearchTimePerHourMs = getExponentialAvgSearchTimePerHourMs(); + if (expAvgSearchTimePerHourMs != null) { + builder.field(EXPONENTIAL_AVG_SEARCH_TIME_PER_HOUR_MS.getPreferredName(), expAvgSearchTimePerHourMs); } } + if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) { + builder.field(EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), exponentialAvgCalculationContext); + } builder.endObject(); return builder; } @@ -159,12 +203,18 @@ public boolean equals(Object obj) { return Objects.equals(this.jobId, other.jobId) && this.searchCount == other.searchCount && this.bucketCount == other.bucketCount - && this.totalSearchTimeMs == other.totalSearchTimeMs; + && this.totalSearchTimeMs == other.totalSearchTimeMs + && Objects.equals(this.exponentialAvgCalculationContext, other.exponentialAvgCalculationContext); } @Override public int hashCode() { - return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs); + return Objects.hash( + jobId, + searchCount, + bucketCount, + totalSearchTimeMs, + exponentialAvgCalculationContext); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 6981772066b96..baf655a280d67 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -58,6 +58,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.core.ml.job.results.Result; import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import java.io.IOException; import java.util.ArrayList; @@ -931,12 +932,27 @@ private static void addTimingStatsExceptBucketCountMapping(XContentBuilder build .endObject() .startObject(TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName()) .field(TYPE, DOUBLE) + .endObject() + .startObject(TimingStats.EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName()) + .startObject(PROPERTIES) + .startObject(ExponentialAverageCalculationContext.INCREMENTAL_METRIC_VALUE_MS.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() + .startObject(ExponentialAverageCalculationContext.LATEST_TIMESTAMP.getPreferredName()) + .field(TYPE, DATE) + .endObject() + .startObject(ExponentialAverageCalculationContext.PREVIOUS_EXPONENTIAL_AVERAGE_MS.getPreferredName()) + .field(TYPE, DOUBLE) + .endObject() + .endObject() .endObject(); } /** * {@link DatafeedTimingStats} mapping. * Does not include mapping for BUCKET_COUNT as this mapping is added by {@link #addDataCountsMapping} method. + * Does not include mapping for EXPONENTIAL_AVG_CALCULATION_CONTEXT as this mapping is added by + * {@link #addTimingStatsExceptBucketCountMapping} method. * * @throws IOException On builder write error */ @@ -948,6 +964,7 @@ private static void addDatafeedTimingStats(XContentBuilder builder) throws IOExc // re-used: BUCKET_COUNT .startObject(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName()) .field(TYPE, DOUBLE) + // re-used: EXPONENTIAL_AVG_CALCULATION_CONTEXT .endObject(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java index a99260e668685..fbf08cbc6d760 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.job.process.autodetect.state; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; @@ -16,9 +17,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; +import java.time.Instant; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -36,6 +39,9 @@ public class TimingStats implements ToXContentObject, Writeable { public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms"); public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("exponential_average_bucket_processing_time_ms"); + public static final ParseField EXPONENTIAL_AVG_CALCULATION_CONTEXT = new ParseField("exponential_average_calculation_context"); + public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_PER_HOUR_MS = + new ParseField("exponential_average_bucket_processing_time_per_hour_ms"); public static final ParseField TYPE = new ParseField("timing_stats"); @@ -50,13 +56,15 @@ public class TimingStats implements ToXContentObject, Writeable { Double maxBucketProcessingTimeMs = (Double) args[3]; Double avgBucketProcessingTimeMs = (Double) args[4]; Double exponentialAvgBucketProcessingTimeMs = (Double) args[5]; + ExponentialAverageCalculationContext exponentialAvgCalculationContext = (ExponentialAverageCalculationContext) args[6]; return new TimingStats( jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs, - exponentialAvgBucketProcessingTimeMs); + exponentialAvgBucketProcessingTimeMs, + getOrDefault(exponentialAvgCalculationContext, new ExponentialAverageCalculationContext())); }); static { @@ -66,6 +74,7 @@ public class TimingStats implements ToXContentObject, Writeable { PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS); PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS); PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS); + PARSER.declareObject(optionalConstructorArg(), ExponentialAverageCalculationContext.PARSER, EXPONENTIAL_AVG_CALCULATION_CONTEXT); } public static String documentId(String jobId) { @@ -78,6 +87,7 @@ public static String documentId(String jobId) { private Double maxBucketProcessingTimeMs; private Double avgBucketProcessingTimeMs; private Double exponentialAvgBucketProcessingTimeMs; + private final ExponentialAverageCalculationContext exponentialAvgCalculationContext; public TimingStats( String jobId, @@ -85,17 +95,19 @@ public TimingStats( @Nullable Double minBucketProcessingTimeMs, @Nullable Double maxBucketProcessingTimeMs, @Nullable Double avgBucketProcessingTimeMs, - @Nullable Double exponentialAvgBucketProcessingTimeMs) { - this.jobId = jobId; + @Nullable Double exponentialAvgBucketProcessingTimeMs, + ExponentialAverageCalculationContext exponentialAvgCalculationContext) { + this.jobId = Objects.requireNonNull(jobId); this.bucketCount = bucketCount; this.minBucketProcessingTimeMs = minBucketProcessingTimeMs; this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs; this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs; this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs; + this.exponentialAvgCalculationContext = Objects.requireNonNull(exponentialAvgCalculationContext); } public TimingStats(String jobId) { - this(jobId, 0, null, null, null, null); + this(jobId, 0, null, null, null, null, new ExponentialAverageCalculationContext()); } public TimingStats(TimingStats lhs) { @@ -105,7 +117,8 @@ public TimingStats(TimingStats lhs) { lhs.minBucketProcessingTimeMs, lhs.maxBucketProcessingTimeMs, lhs.avgBucketProcessingTimeMs, - lhs.exponentialAvgBucketProcessingTimeMs); + lhs.exponentialAvgBucketProcessingTimeMs, + new ExponentialAverageCalculationContext(lhs.exponentialAvgCalculationContext)); } public TimingStats(StreamInput in) throws IOException { @@ -115,6 +128,11 @@ public TimingStats(StreamInput in) throws IOException { this.maxBucketProcessingTimeMs = in.readOptionalDouble(); this.avgBucketProcessingTimeMs = in.readOptionalDouble(); this.exponentialAvgBucketProcessingTimeMs = in.readOptionalDouble(); + if (in.getVersion().onOrAfter(Version.CURRENT)) { // TODO: Change to V_7_4_0 after backport + this.exponentialAvgCalculationContext = in.readOptionalWriteable(ExponentialAverageCalculationContext::new); + } else { + this.exponentialAvgCalculationContext = new ExponentialAverageCalculationContext(); + } } public String getJobId() { @@ -148,6 +166,15 @@ public Double getExponentialAvgBucketProcessingTimeMs() { return exponentialAvgBucketProcessingTimeMs; } + public Double getExponentialAvgBucketProcessingTimePerHourMs() { + return exponentialAvgCalculationContext.getCurrentExponentialAverageMs(); + } + + // Visible for testing + ExponentialAverageCalculationContext getExponentialAvgCalculationContext() { + return exponentialAvgCalculationContext; + } + /** * Updates the statistics (min, max, avg, exponential avg) for the given data point (bucket processing time). */ @@ -176,6 +203,11 @@ public void updateStats(double bucketProcessingTimeMs) { exponentialAvgBucketProcessingTimeMs = (1 - ALPHA) * exponentialAvgBucketProcessingTimeMs + ALPHA * bucketProcessingTimeMs; } bucketCount++; + exponentialAvgCalculationContext.increment(bucketProcessingTimeMs); + } + + public void setLatestRecordTimestamp(Instant latestRecordTimestamp) { + exponentialAvgCalculationContext.setLatestTimestamp(latestRecordTimestamp); } /** @@ -191,6 +223,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalDouble(maxBucketProcessingTimeMs); out.writeOptionalDouble(avgBucketProcessingTimeMs); out.writeOptionalDouble(exponentialAvgBucketProcessingTimeMs); + if (out.getVersion().onOrAfter(Version.CURRENT)) { // TODO: Change to V_7_4_0 after backport + out.writeOptionalWriteable(exponentialAvgCalculationContext); + } } @Override @@ -216,6 +251,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (exponentialAvgBucketProcessingTimeMs != null) { builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs); } + if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) { + Double expAvgBucketProcessingTimePerHourMs = getExponentialAvgBucketProcessingTimePerHourMs(); + if (expAvgBucketProcessingTimePerHourMs != null) { + builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_PER_HOUR_MS.getPreferredName(), expAvgBucketProcessingTimePerHourMs); + } + } + if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) { + builder.field(EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), exponentialAvgCalculationContext); + } builder.endObject(); return builder; } @@ -230,7 +274,8 @@ public boolean equals(Object o) { && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs) && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs) && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs) - && Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs); + && Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs) + && Objects.equals(this.exponentialAvgCalculationContext, that.exponentialAvgCalculationContext); } @Override @@ -241,11 +286,16 @@ public int hashCode() { minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs, - exponentialAvgBucketProcessingTimeMs); + exponentialAvgBucketProcessingTimeMs, + exponentialAvgCalculationContext); } @Override public String toString() { return Strings.toString(this); } + + private static T getOrDefault(@Nullable T value, T defaultValue) { + return value != null ? value : defaultValue; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 51717c6bad2d0..76860e28481ce 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import java.util.Arrays; import java.util.HashSet; @@ -185,10 +186,16 @@ public final class ReservedFieldNames { TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), + TimingStats.EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), + DatafeedTimingStats.EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), + + ExponentialAverageCalculationContext.INCREMENTAL_METRIC_VALUE_MS.getPreferredName(), + ExponentialAverageCalculationContext.LATEST_TIMESTAMP.getPreferredName(), + ExponentialAverageCalculationContext.PREVIOUS_EXPONENTIAL_AVERAGE_MS.getPreferredName(), GetResult._ID, GetResult._INDEX, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExponentialAverageCalculationContext.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExponentialAverageCalculationContext.java new file mode 100644 index 0000000000000..25a93a2fe5b5e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExponentialAverageCalculationContext.java @@ -0,0 +1,212 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.utils; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.common.time.TimeUtils; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Utility for calculating current value of exponentially-weighted moving average per fixed-sized time window. + * + * The formula for the current value of the exponentially-weighted moving average is: + * + * currentExponentialAverageMs = alpha * previousExponentialAverageMs + (1 - alpha) * incrementalMetricValueMs + * + * where alpha depends on what fraction of the current time window we've already seen: + * + * alpha = e^(-time_elapsed_since_window_start/window_size) + * time_elapsed_since_window_start = latestTimestamp - window_start + * + * The class holds 3 values based on which it performs the calculation: + * - incrementalMetricValueMs - accumulated value of the metric in the current time window + * - latestTimestamp - timestamp updated as the time passes through the current time window + * - previousExponentialAverageMs - exponential average for previous time windows + * + * incrementalMetricValueMs should be updated using {@link #increment}. + * latestTimestamp should be updated using {@link #setLatestTimestamp}. + * Because it can happen that the timestamp is not available while incrementing the metric value, it is the responsibility of the user + * of this class to always call {@link #setLatestTimestamp} *after* all the relevant (i.e. referring to the points in time before the + * latest timestamp mentioned) {@link #increment} calls are made. + */ +public class ExponentialAverageCalculationContext implements Writeable, ToXContentObject { + + public static final ParseField INCREMENTAL_METRIC_VALUE_MS = new ParseField("incremental_metric_value_ms"); + public static final ParseField LATEST_TIMESTAMP = new ParseField("latest_timestamp"); + public static final ParseField PREVIOUS_EXPONENTIAL_AVERAGE_MS = new ParseField("previous_exponential_average_ms"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "exponential_average_calculation_context", + true, + args -> { + Double incrementalMetricValueMs = (Double) args[0]; + Instant latestTimestamp = (Instant) args[1]; + Double previousExponentialAverageMs = (Double) args[2]; + return new ExponentialAverageCalculationContext( + getOrDefault(incrementalMetricValueMs, 0.0), + latestTimestamp, + previousExponentialAverageMs); + }); + + static { + PARSER.declareDouble(optionalConstructorArg(), INCREMENTAL_METRIC_VALUE_MS); + PARSER.declareField( + optionalConstructorArg(), + p -> TimeUtils.parseTimeFieldToInstant(p, LATEST_TIMESTAMP.getPreferredName()), + LATEST_TIMESTAMP, + ObjectParser.ValueType.VALUE); + PARSER.declareDouble(optionalConstructorArg(), PREVIOUS_EXPONENTIAL_AVERAGE_MS); + } + + private static final TemporalUnit WINDOW_UNIT = ChronoUnit.HOURS; + private static final Duration WINDOW_SIZE = WINDOW_UNIT.getDuration(); + + private double incrementalMetricValueMs; + private Instant latestTimestamp; + private Double previousExponentialAverageMs; + + public ExponentialAverageCalculationContext() { + this(0.0, null, null); + } + + public ExponentialAverageCalculationContext( + double incrementalMetricValueMs, + @Nullable Instant latestTimestamp, + @Nullable Double previousExponentialAverageMs) { + this.incrementalMetricValueMs = incrementalMetricValueMs; + this.latestTimestamp = latestTimestamp != null ? Instant.ofEpochMilli(latestTimestamp.toEpochMilli()) : null; + this.previousExponentialAverageMs = previousExponentialAverageMs; + } + + public ExponentialAverageCalculationContext(ExponentialAverageCalculationContext lhs) { + this(lhs.incrementalMetricValueMs, lhs.latestTimestamp, lhs.previousExponentialAverageMs); + } + + public ExponentialAverageCalculationContext(StreamInput in) throws IOException { + this.incrementalMetricValueMs = in.readDouble(); + this.latestTimestamp = in.readOptionalInstant(); + this.previousExponentialAverageMs = in.readOptionalDouble(); + } + + // Visible for testing + public double getIncrementalMetricValueMs() { + return incrementalMetricValueMs; + } + + // Visible for testing + public Instant getLatestTimestamp() { + return latestTimestamp; + } + + // Visible for testing + public Double getPreviousExponentialAverageMs() { + return previousExponentialAverageMs; + } + + public Double getCurrentExponentialAverageMs() { + if (previousExponentialAverageMs == null || latestTimestamp == null) return incrementalMetricValueMs; + Instant currentWindowStartTimestamp = latestTimestamp.truncatedTo(WINDOW_UNIT); + double alpha = Math.exp( + - (double) Duration.between(currentWindowStartTimestamp, latestTimestamp).toMillis() / WINDOW_SIZE.toMillis()); + return alpha * previousExponentialAverageMs + (1 - alpha) * incrementalMetricValueMs; + } + + /** + * Increments the current accumulated metric value by the given delta. + */ + public void increment(double metricValueDeltaMs) { + incrementalMetricValueMs += metricValueDeltaMs; + } + + /** + * Sets the latest timestamp that serves as an indication of the current point in time. + * Before calling this method make sure all the associated calls to {@link #increment} were already made. + */ + public void setLatestTimestamp(Instant newLatestTimestamp) { + Objects.requireNonNull(newLatestTimestamp); + if (this.latestTimestamp != null) { + Instant nextWindowStartTimestamp = this.latestTimestamp.truncatedTo(WINDOW_UNIT).plus(WINDOW_SIZE); + if (newLatestTimestamp.compareTo(nextWindowStartTimestamp) >= 0) { + // When we cross the boundary between windows, we update the exponential average with metric values accumulated so far in + // incrementalMetricValueMs variable. + this.previousExponentialAverageMs = getCurrentExponentialAverageMs(); + this.incrementalMetricValueMs = 0.0; + } + } else { + // This is the first time {@link #setLatestRecordTimestamp} is called on this object. + } + if (this.latestTimestamp == null || newLatestTimestamp.isAfter(this.latestTimestamp)) { + this.latestTimestamp = newLatestTimestamp; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(incrementalMetricValueMs); + out.writeOptionalInstant(latestTimestamp); + out.writeOptionalDouble(previousExponentialAverageMs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(INCREMENTAL_METRIC_VALUE_MS.getPreferredName(), incrementalMetricValueMs); + if (latestTimestamp != null) { + builder.timeField( + LATEST_TIMESTAMP.getPreferredName(), + LATEST_TIMESTAMP.getPreferredName() + "_string", + latestTimestamp.toEpochMilli()); + } + if (previousExponentialAverageMs != null) { + builder.field(PREVIOUS_EXPONENTIAL_AVERAGE_MS.getPreferredName(), previousExponentialAverageMs); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if (o == null || getClass() != o.getClass()) return false; + ExponentialAverageCalculationContext that = (ExponentialAverageCalculationContext) o; + return this.incrementalMetricValueMs == that.incrementalMetricValueMs + && Objects.equals(this.latestTimestamp, that.latestTimestamp) + && Objects.equals(this.previousExponentialAverageMs, that.previousExponentialAverageMs); + } + + @Override + public int hashCode() { + return Objects.hash(incrementalMetricValueMs, latestTimestamp, previousExponentialAverageMs); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @SuppressWarnings("unchecked") + private static T getOrDefault(@Nullable T value, T defaultValue) { + return value != null ? value : defaultValue; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java index 74bc71074260e..cc9bbabd12326 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStatsTests; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import java.io.IOException; import java.net.InetAddress; @@ -78,7 +79,8 @@ public void testDatafeedStatsToXContent() throws IOException { Set.of(), Version.CURRENT); - DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 10, 100.0); + DatafeedTimingStats timingStats = + new DatafeedTimingStats("my-job-id", 5, 10, 100.0, new ExponentialAverageCalculationContext(50.0, null, null)); Response.DatafeedStats stats = new Response.DatafeedStats("df-id", DatafeedState.STARTED, node, null, timingStats); @@ -110,11 +112,12 @@ public void testDatafeedStatsToXContent() throws IOException { assertThat(nodeAttributes, hasEntry("ml.max_open_jobs", "5")); Map timingStatsMap = (Map) dfStatsMap.get("timing_stats"); - assertThat(timingStatsMap.size(), is(equalTo(5))); + assertThat(timingStatsMap.size(), is(equalTo(6))); assertThat(timingStatsMap, hasEntry("job_id", "my-job-id")); assertThat(timingStatsMap, hasEntry("search_count", 5)); assertThat(timingStatsMap, hasEntry("bucket_count", 10)); assertThat(timingStatsMap, hasEntry("total_search_time_ms", 100.0)); assertThat(timingStatsMap, hasEntry("average_search_time_per_bucket_ms", 10.0)); + assertThat(timingStatsMap, hasEntry("exponential_average_search_time_per_hour_ms", 50.0)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java index e8d7798ba6cc0..9ca522074e4d3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java @@ -7,12 +7,18 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContextTests; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import java.io.IOException; +import java.time.Instant; +import java.util.Collections; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; @@ -23,7 +29,12 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase { + + public static ExponentialAverageCalculationContext createRandom() { + return new ExponentialAverageCalculationContext( + randomDouble(), + randomBoolean() ? Instant.now() : null, + randomBoolean() ? randomDouble() : null); + } + + @Override + public ExponentialAverageCalculationContext createTestInstance() { + return createRandom(); + } + + @Override + protected Writeable.Reader instanceReader() { + return ExponentialAverageCalculationContext::new; + } + + @Override + protected ExponentialAverageCalculationContext doParseInstance(XContentParser parser) { + return ExponentialAverageCalculationContext.PARSER.apply(parser, null); + } + + public void testDefaultConstructor() { + ExponentialAverageCalculationContext context = new ExponentialAverageCalculationContext(); + + assertThat(context.getIncrementalMetricValueMs(), equalTo(0.0)); + assertThat(context.getLatestTimestamp(), nullValue()); + assertThat(context.getPreviousExponentialAverageMs(), nullValue()); + } + + public void testConstructor() { + ExponentialAverageCalculationContext context = + new ExponentialAverageCalculationContext(1.23, Instant.ofEpochMilli(123456789), 4.56); + + assertThat(context.getIncrementalMetricValueMs(), equalTo(1.23)); + assertThat(context.getLatestTimestamp(), equalTo(Instant.ofEpochMilli(123456789))); + assertThat(context.getPreviousExponentialAverageMs(), equalTo(4.56)); + } + + public void testCopyConstructor() { + ExponentialAverageCalculationContext context1 = + new ExponentialAverageCalculationContext(1.23, Instant.ofEpochMilli(123456789), 4.56); + ExponentialAverageCalculationContext context2 = new ExponentialAverageCalculationContext(context1); + + assertThat(context2.getIncrementalMetricValueMs(), equalTo(1.23)); + assertThat(context2.getLatestTimestamp(), equalTo(Instant.ofEpochMilli(123456789))); + assertThat(context2.getPreviousExponentialAverageMs(), equalTo(4.56)); + assertThat(context2.getCurrentExponentialAverageMs(), equalTo(context1.getCurrentExponentialAverageMs())); + } + + public void testExponentialAverageCalculation() { + ExponentialAverageCalculationContext context = new ExponentialAverageCalculationContext(0.0, null, null); + assertThat(context.getIncrementalMetricValueMs(), equalTo(0.0)); + assertThat(context.getLatestTimestamp(), nullValue()); + assertThat(context.getPreviousExponentialAverageMs(), nullValue()); + assertThat(context.getCurrentExponentialAverageMs(), equalTo(0.0)); + + context.increment(100.0); + context.increment(100.0); + context.increment(100.0); + assertThat(context.getIncrementalMetricValueMs(), equalTo(300.0)); + assertThat(context.getLatestTimestamp(), nullValue()); + assertThat(context.getPreviousExponentialAverageMs(), nullValue()); + assertThat(context.getCurrentExponentialAverageMs(), equalTo(300.0)); + + context.setLatestTimestamp(Instant.parse("2019-07-19T03:30:00.00Z")); + assertThat(context.getIncrementalMetricValueMs(), equalTo(300.0)); + assertThat(context.getLatestTimestamp(), equalTo(Instant.parse("2019-07-19T03:30:00.00Z"))); + assertThat(context.getPreviousExponentialAverageMs(), nullValue()); + assertThat(context.getCurrentExponentialAverageMs(), equalTo(300.0)); + + context.increment(200.0); + assertThat(context.getIncrementalMetricValueMs(), equalTo(500.0)); + assertThat(context.getLatestTimestamp(), equalTo(Instant.parse("2019-07-19T03:30:00.00Z"))); + assertThat(context.getPreviousExponentialAverageMs(), nullValue()); + assertThat(context.getCurrentExponentialAverageMs(), equalTo(500.0)); + + context.setLatestTimestamp(Instant.parse("2019-07-19T04:00:01.00Z")); + assertThat(context.getIncrementalMetricValueMs(), equalTo(0.0)); + assertThat(context.getLatestTimestamp(), equalTo(Instant.parse("2019-07-19T04:00:01.00Z"))); + assertThat(context.getPreviousExponentialAverageMs(), equalTo(500.0)); + assertThat(context.getCurrentExponentialAverageMs(), closeTo(499.8, 0.1)); + + context.increment(1000.0); + context.setLatestTimestamp(Instant.parse("2019-07-19T04:30:00.00Z")); + assertThat(context.getIncrementalMetricValueMs(), equalTo(1000.0)); + assertThat(context.getLatestTimestamp(), equalTo(Instant.parse("2019-07-19T04:30:00.00Z"))); + assertThat(context.getPreviousExponentialAverageMs(), equalTo(500.0)); + assertThat(context.getCurrentExponentialAverageMs(), closeTo(696.7, 0.1)); + } + + public void testExponentialAverageCalculationOnWindowBoundary() { + ExponentialAverageCalculationContext context = + new ExponentialAverageCalculationContext(500.0, Instant.parse("2019-07-19T04:25:06.00Z"), 200.0); + assertThat(context.getIncrementalMetricValueMs(), equalTo(500.0)); + assertThat(context.getLatestTimestamp(), equalTo(Instant.parse("2019-07-19T04:25:06.00Z"))); + assertThat(context.getPreviousExponentialAverageMs(), equalTo(200.0)); + assertThat(context.getCurrentExponentialAverageMs(), closeTo(302.5, 0.1)); + + context.setLatestTimestamp(Instant.parse("2019-07-19T05:00:00.00Z")); + assertThat(context.getIncrementalMetricValueMs(), equalTo(0.0)); + assertThat(context.getLatestTimestamp(), equalTo(Instant.parse("2019-07-19T05:00:00.00Z"))); + assertThat(context.getPreviousExponentialAverageMs(), closeTo(302.5, 0.1)); + assertThat(context.getCurrentExponentialAverageMs(), equalTo(context.getPreviousExponentialAverageMs())); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java index fbb32395f14ef..7df3919c459b3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java @@ -46,7 +46,7 @@ public void reportSearchDuration(TimeValue searchDuration) { if (searchDuration == null) { return; } - currentTimingStats.incrementTotalSearchTimeMs(searchDuration.millis()); + currentTimingStats.incrementSearchTimeMs(searchDuration.millis()); flushIfDifferSignificantly(); } @@ -58,6 +58,9 @@ public void reportDataCounts(DataCounts dataCounts) { return; } currentTimingStats.incrementBucketCount(dataCounts.getBucketCount()); + if (dataCounts.getLatestRecordTimeStamp() != null) { + currentTimingStats.setLatestRecordTimestamp(dataCounts.getLatestRecordTimeStamp().toInstant()); + } flushIfDifferSignificantly(); } @@ -79,7 +82,8 @@ private void flush() { public static boolean differSignificantly(DatafeedTimingStats stats1, DatafeedTimingStats stats2) { return countsDifferSignificantly(stats1.getSearchCount(), stats2.getSearchCount()) || differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs()) - || differSignificantly(stats1.getAvgSearchTimePerBucketMs(), stats2.getAvgSearchTimePerBucketMs()); + || differSignificantly(stats1.getAvgSearchTimePerBucketMs(), stats2.getAvgSearchTimePerBucketMs()) + || differSignificantly(stats1.getExponentialAvgSearchTimePerHourMs(), stats2.getExponentialAvgSearchTimePerHourMs()); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java index 0da4046edb49e..69d6936e9051f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; import java.util.Objects; @@ -35,8 +36,9 @@ public TimingStats getCurrentTimingStats() { return new TimingStats(currentTimingStats); } - public void reportBucketProcessingTime(long bucketProcessingTimeMs) { - currentTimingStats.updateStats(bucketProcessingTimeMs); + public void reportBucket(Bucket bucket) { + currentTimingStats.updateStats(bucket.getProcessingTimeMs()); + currentTimingStats.setLatestRecordTimestamp(bucket.getTimestamp().toInstant().plusSeconds(bucket.getBucketSpan())); if (differSignificantly(currentTimingStats, persistedTimingStats)) { flush(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index d2d052b1a3e6c..f596fbc669af6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -213,7 +213,7 @@ void processResult(AutodetectResult result) { // persist after deleting interim results in case the new // results are also interim - timingStatsReporter.reportBucketProcessingTime(bucket.getProcessingTimeMs()); + timingStatsReporter.reportBucket(bucket); bulkResultsPersister.persistBucket(bucket).executeRequest(); ++bucketCount; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java index 9c48dd780a321..6daa0f5a0b842 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.junit.Before; import org.mockito.InOrder; @@ -35,81 +36,76 @@ public void setUpTests() { } public void testReportSearchDuration_Null() { - DatafeedTimingStatsReporter timingStatsReporter = - new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); timingStatsReporter.reportSearchDuration(null); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); verifyZeroInteractions(jobResultsPersister); } public void testReportSearchDuration_Zero() { - DatafeedTimingStatsReporter timingStatsReporter = - new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID), jobResultsPersister); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 0, 0, 0.0))); + DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0)); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 0, 0, 0.0))); timingStatsReporter.reportSearchDuration(TimeValue.ZERO); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 1, 0, 0.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0))); - verify(jobResultsPersister).persistDatafeedTimingStats(new DatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.IMMEDIATE); + verify(jobResultsPersister).persistDatafeedTimingStats(createDatafeedTimingStats(JOB_ID, 1, 0, 0.0), RefreshPolicy.IMMEDIATE); verifyNoMoreInteractions(jobResultsPersister); } public void testReportSearchDuration() { - DatafeedTimingStatsReporter timingStatsReporter = - new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 13, 10, 10000.0), jobResultsPersister); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 13, 10, 10000.0))); + DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0)); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 13, 10, 10000.0, 10000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 14, 10, 11000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 14, 10, 11000.0, 11000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 15, 10, 12000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 16, 10, 13000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 16, 10, 13000.0, 13000.0))); timingStatsReporter.reportSearchDuration(ONE_SECOND); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 17, 10, 14000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0))); InOrder inOrder = inOrder(jobResultsPersister); inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - new DatafeedTimingStats(JOB_ID, 15, 10, 12000.0), RefreshPolicy.IMMEDIATE); + createDatafeedTimingStats(JOB_ID, 15, 10, 12000.0, 12000.0), RefreshPolicy.IMMEDIATE); inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - new DatafeedTimingStats(JOB_ID, 17, 10, 14000.0), RefreshPolicy.IMMEDIATE); + createDatafeedTimingStats(JOB_ID, 17, 10, 14000.0, 14000.0), RefreshPolicy.IMMEDIATE); verifyNoMoreInteractions(jobResultsPersister); } public void testReportDataCounts_Null() { - DatafeedTimingStatsReporter timingStatsReporter = - new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0)); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); timingStatsReporter.reportDataCounts(null); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 10, 10000.0))); verifyZeroInteractions(jobResultsPersister); } public void testReportDataCounts() { - DatafeedTimingStatsReporter timingStatsReporter = - new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0), jobResultsPersister); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0))); + DatafeedTimingStatsReporter timingStatsReporter = createReporter(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0)); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 20, 10000.0))); timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 21, 10000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 21, 10000.0))); timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 22, 10000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 22, 10000.0))); timingStatsReporter.reportDataCounts(createDataCountsWithBucketCount(1)); - assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0))); + assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0))); InOrder inOrder = inOrder(jobResultsPersister); inOrder.verify(jobResultsPersister).persistDatafeedTimingStats( - new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE); + createDatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE); verifyNoMoreInteractions(jobResultsPersister); } @@ -122,35 +118,57 @@ private static DataCounts createDataCountsWithBucketCount(long bucketCount) { public void testTimingStatsDifferSignificantly() { assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 1000.0), createDatafeedTimingStats(JOB_ID, 5, 10, 1000.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1100.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 1000.0), createDatafeedTimingStats(JOB_ID, 5, 10, 1100.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1120.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 1000.0), createDatafeedTimingStats(JOB_ID, 5, 10, 1120.0)), is(true)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11000.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 10000.0), createDatafeedTimingStats(JOB_ID, 5, 10, 11000.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11200.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 10000.0), createDatafeedTimingStats(JOB_ID, 5, 10, 11200.0)), is(true)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110000.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 100000.0), createDatafeedTimingStats(JOB_ID, 5, 10, 110000.0)), is(false)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110001.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 100000.0), createDatafeedTimingStats(JOB_ID, 5, 10, 110001.0)), is(true)); assertThat( DatafeedTimingStatsReporter.differSignificantly( - new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 50, 10, 100000.0)), + createDatafeedTimingStats(JOB_ID, 5, 10, 100000.0), createDatafeedTimingStats(JOB_ID, 50, 10, 100000.0)), is(true)); } + + private DatafeedTimingStatsReporter createReporter(DatafeedTimingStats timingStats) { + return new DatafeedTimingStatsReporter(timingStats, jobResultsPersister); + } + + private static DatafeedTimingStats createDatafeedTimingStats( + String jobId, + long searchCount, + long bucketCount, + double totalSearchTimeMs) { + return createDatafeedTimingStats(jobId, searchCount, bucketCount, totalSearchTimeMs, 0.0); + } + + private static DatafeedTimingStats createDatafeedTimingStats( + String jobId, + long searchCount, + long bucketCount, + double totalSearchTimeMs, + double incrementalSearchTimeMs) { + ExponentialAverageCalculationContext context = new ExponentialAverageCalculationContext(incrementalSearchTimeMs, null, null); + return new DatafeedTimingStats(jobId, searchCount, bucketCount, totalSearchTimeMs, context); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 94017ef266f2c..4aff83ab39065 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -25,9 +25,11 @@ import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -208,7 +210,9 @@ public void testPersistTimingStats() { Client client = mockClient(bulkRequestCaptor); JobResultsPersister persister = new JobResultsPersister(client); - TimingStats timingStats = new TimingStats("foo", 7, 1.0, 2.0, 1.23, 7.89); + TimingStats timingStats = + new TimingStats( + "foo", 7, 1.0, 2.0, 1.23, 7.89, new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(123456789), 60.0)); persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest(); verify(client, times(1)).bulk(bulkRequestCaptor.capture()); @@ -227,7 +231,11 @@ public void testPersistTimingStats() { "minimum_bucket_processing_time_ms", 1.0, "maximum_bucket_processing_time_ms", 2.0, "average_bucket_processing_time_ms", 1.23, - "exponential_average_bucket_processing_time_ms", 7.89))); + "exponential_average_bucket_processing_time_ms", 7.89, + "exponential_average_calculation_context", Map.of( + "incremental_metric_value_ms", 600.0, + "previous_exponential_average_ms", 60.0, + "latest_timestamp", 123456789)))); verify(client, times(1)).threadPool(); verifyNoMoreInteractions(client); @@ -247,7 +255,9 @@ public void testPersistDatafeedTimingStats() { .when(client).index(any(), any(ActionListener.class)); JobResultsPersister persister = new JobResultsPersister(client); - DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 66, 666.0); + DatafeedTimingStats timingStats = + new DatafeedTimingStats( + "foo", 6, 66, 666.0, new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(123456789), 60.0)); persister.persistDatafeedTimingStats(timingStats, WriteRequest.RefreshPolicy.IMMEDIATE); ArgumentCaptor indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); @@ -264,7 +274,11 @@ public void testPersistDatafeedTimingStats() { "job_id", "foo", "search_count", 6, "bucket_count", 66, - "total_search_time_ms", 666.0))); + "total_search_time_ms", 666.0, + "exponential_average_calculation_context", Map.of( + "incremental_metric_value_ms", 600.0, + "previous_exponential_average_ms", 60.0, + "latest_timestamp", 123456789)))); verify(client, times(1)).threadPool(); verifyNoMoreInteractions(client); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java index cf005d3c3da59..d845bb5751fd4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java @@ -55,10 +55,12 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -845,7 +847,11 @@ public void testTimingStats_Ok() throws IOException { TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1.0, TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1000.0, TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0, - TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0)); + TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0, + TimingStats.EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), Map.of( + ExponentialAverageCalculationContext.INCREMENTAL_METRIC_VALUE_MS.getPreferredName(), 100.0, + ExponentialAverageCalculationContext.LATEST_TIMESTAMP.getPreferredName(), Instant.ofEpochMilli(1000_000_000), + ExponentialAverageCalculationContext.PREVIOUS_EXPONENTIAL_AVERAGE_MS.getPreferredName(), 200.0))); SearchResponse response = createSearchResponse(source); Client client = getMockedClient( queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")), @@ -853,9 +859,11 @@ public void testTimingStats_Ok() throws IOException { when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName)); JobResultsProvider provider = createProvider(client); + ExponentialAverageCalculationContext context = + new ExponentialAverageCalculationContext(100.0, Instant.ofEpochMilli(1000_000_000), 200.0); provider.timingStats( "foo", - stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0, 777.0))), + stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0, 777.0, context))), e -> { throw new AssertionError(); }); verify(client).prepareSearch(indexName); @@ -904,14 +912,22 @@ public void testDatafeedTimingStats_MultipleDocumentsAtOnce() throws IOException Job.ID.getPreferredName(), "foo", DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6, DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66, - DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0)); + DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0, + DatafeedTimingStats.EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), Map.of( + ExponentialAverageCalculationContext.INCREMENTAL_METRIC_VALUE_MS.getPreferredName(), 600.0, + ExponentialAverageCalculationContext.LATEST_TIMESTAMP.getPreferredName(), Instant.ofEpochMilli(100000600), + ExponentialAverageCalculationContext.PREVIOUS_EXPONENTIAL_AVERAGE_MS.getPreferredName(), 60.0))); List> sourceBar = Arrays.asList( Map.of( Job.ID.getPreferredName(), "bar", DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 7, DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 77, - DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 777.0)); + DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 777.0, + DatafeedTimingStats.EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), Map.of( + ExponentialAverageCalculationContext.INCREMENTAL_METRIC_VALUE_MS.getPreferredName(), 700.0, + ExponentialAverageCalculationContext.LATEST_TIMESTAMP.getPreferredName(), Instant.ofEpochMilli(100000700), + ExponentialAverageCalculationContext.PREVIOUS_EXPONENTIAL_AVERAGE_MS.getPreferredName(), 70.0))); SearchResponse responseFoo = createSearchResponse(sourceFoo); SearchResponse responseBar = createSearchResponse(sourceBar); MultiSearchResponse multiSearchResponse = new MultiSearchResponse( @@ -940,6 +956,10 @@ public void testDatafeedTimingStats_MultipleDocumentsAtOnce() throws IOException new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(AnomalyDetectorsIndex.jobResultsAliasedName("bar"))); JobResultsProvider provider = createProvider(client); + ExponentialAverageCalculationContext contextFoo = + new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(100000600), 60.0); + ExponentialAverageCalculationContext contextBar = + new ExponentialAverageCalculationContext(700.0, Instant.ofEpochMilli(100000700), 70.0); provider.datafeedTimingStats( List.of("foo", "bar"), statsByJobId -> @@ -947,8 +967,8 @@ public void testDatafeedTimingStats_MultipleDocumentsAtOnce() throws IOException statsByJobId, equalTo( Map.of( - "foo", new DatafeedTimingStats("foo", 6, 66, 666.0), - "bar", new DatafeedTimingStats("bar", 7, 77, 777.0)))), + "foo", new DatafeedTimingStats("foo", 6, 66, 666.0, contextFoo), + "bar", new DatafeedTimingStats("bar", 7, 77, 777.0, contextBar)))), e -> { throw new AssertionError(); }); verify(client).threadPool(); @@ -967,7 +987,11 @@ public void testDatafeedTimingStats_Ok() throws IOException { Job.ID.getPreferredName(), "foo", DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6, DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66, - DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0)); + DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0, + DatafeedTimingStats.EXPONENTIAL_AVG_CALCULATION_CONTEXT.getPreferredName(), Map.of( + ExponentialAverageCalculationContext.INCREMENTAL_METRIC_VALUE_MS.getPreferredName(), 600.0, + ExponentialAverageCalculationContext.LATEST_TIMESTAMP.getPreferredName(), Instant.ofEpochMilli(100000600), + ExponentialAverageCalculationContext.PREVIOUS_EXPONENTIAL_AVERAGE_MS.getPreferredName(), 60.0))); SearchResponse response = createSearchResponse(source); Client client = getMockedClient( queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")), @@ -975,9 +999,11 @@ public void testDatafeedTimingStats_Ok() throws IOException { when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName)); JobResultsProvider provider = createProvider(client); + ExponentialAverageCalculationContext contextFoo = + new ExponentialAverageCalculationContext(600.0, Instant.ofEpochMilli(100000600), 60.0); provider.datafeedTimingStats( "foo", - stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 66, 666.0))), + stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 66, 666.0, contextFoo))), e -> { throw new AssertionError(); }); verify(client).prepareSearch(indexName); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java index 4e5a97f860d9d..9e1e5646e115d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java @@ -5,11 +5,18 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.elasticsearch.common.Nullable; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import org.junit.Before; import org.mockito.InOrder; +import java.time.Duration; +import java.time.Instant; +import java.util.Date; + import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.inOrder; @@ -20,6 +27,8 @@ public class TimingStatsReporterTests extends ESTestCase { private static final String JOB_ID = "my-job-id"; + private static final Instant TIMESTAMP = Instant.ofEpochMilli(1000000000); + private static final Duration BUCKET_SPAN = Duration.ofMinutes(1); private JobResultsPersister.Builder bulkResultsPersister; @@ -29,56 +38,56 @@ public void setUpTests() { } public void testGetCurrentTimingStats() { - TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89); - TimingStatsReporter reporter = new TimingStatsReporter(stats, bulkResultsPersister); + TimingStats stats = createTimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89); + TimingStatsReporter reporter = createReporter(stats); assertThat(reporter.getCurrentTimingStats(), equalTo(stats)); verifyZeroInteractions(bulkResultsPersister); } public void testReporting() { - TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister); + TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID))); - reporter.reportBucketProcessingTime(10); - assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0))); + reporter.reportBucket(createBucket(10)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0))); - reporter.reportBucketProcessingTime(20); - assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1))); + reporter.reportBucket(createBucket(20)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createTimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1, 30.0))); - reporter.reportBucketProcessingTime(15); - assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 20.0, 15.0, 10.149))); + reporter.reportBucket(createBucket(15)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createTimingStats(JOB_ID, 3, 10.0, 20.0, 15.0, 10.149, 45.0))); InOrder inOrder = inOrder(bulkResultsPersister); - inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0)); - inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1)); + inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); + inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1, 30.0)); inOrder.verifyNoMoreInteractions(); } public void testFinishReporting() { - TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister); + TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID))); - reporter.reportBucketProcessingTime(10); - assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0))); + reporter.reportBucket(createBucket(10)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0))); - reporter.reportBucketProcessingTime(10); - assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 2, 10.0, 10.0, 10.0, 10.0))); + reporter.reportBucket(createBucket(10)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createTimingStats(JOB_ID, 2, 10.0, 10.0, 10.0, 10.0, 20.0))); - reporter.reportBucketProcessingTime(10); - assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0))); + reporter.reportBucket(createBucket(10)); + assertThat(reporter.getCurrentTimingStats(), equalTo(createTimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0, 30.0))); reporter.finishReporting(); - assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0))); + assertThat(reporter.getCurrentTimingStats(), equalTo(createTimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0, 30.0))); InOrder inOrder = inOrder(bulkResultsPersister); - inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0)); - inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0)); + inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); + inOrder.verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0, 30.0)); inOrder.verifyNoMoreInteractions(); } public void testFinishReportingNoChange() { - TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister); + TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); reporter.finishReporting(); @@ -86,27 +95,27 @@ public void testFinishReportingNoChange() { } public void testFinishReportingWithChange() { - TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister); + TimingStatsReporter reporter = createReporter(new TimingStats(JOB_ID)); - reporter.reportBucketProcessingTime(10); + reporter.reportBucket(createBucket(10)); reporter.finishReporting(); - verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0)); + verify(bulkResultsPersister).persistTimingStats(createTimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0, 10.0)); } public void testTimingStatsDifferSignificantly() { assertThat( TimingStatsReporter.differSignificantly( - new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)), + createTimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), createTimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)), is(false)); assertThat( TimingStatsReporter.differSignificantly( - new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)), + createTimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), createTimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)), is(false)); assertThat( TimingStatsReporter.differSignificantly( - new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)), + createTimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), createTimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)), is(true)); } @@ -121,4 +130,51 @@ public void testValuesDifferSignificantly() { assertThat(TimingStatsReporter.differSignificantly(0.0, 1.0), is(true)); assertThat(TimingStatsReporter.differSignificantly(1.0, 0.0), is(true)); } + + private TimingStatsReporter createReporter(TimingStats timingStats) { + return new TimingStatsReporter(timingStats, bulkResultsPersister); + } + + private static TimingStats createTimingStats( + String jobId, + long bucketCount, + @Nullable Double minBucketProcessingTimeMs, + @Nullable Double maxBucketProcessingTimeMs, + @Nullable Double avgBucketProcessingTimeMs, + @Nullable Double exponentialAvgBucketProcessingTimeMs) { + return createTimingStats( + jobId, + bucketCount, + minBucketProcessingTimeMs, + maxBucketProcessingTimeMs, + avgBucketProcessingTimeMs, + exponentialAvgBucketProcessingTimeMs, + 0.0); + } + + private static TimingStats createTimingStats( + String jobId, + long bucketCount, + @Nullable Double minBucketProcessingTimeMs, + @Nullable Double maxBucketProcessingTimeMs, + @Nullable Double avgBucketProcessingTimeMs, + @Nullable Double exponentialAvgBucketProcessingTimeMs, + double incrementalBucketProcessingTimeMs) { + ExponentialAverageCalculationContext context = + new ExponentialAverageCalculationContext(incrementalBucketProcessingTimeMs, TIMESTAMP.plus(BUCKET_SPAN), null); + return new TimingStats( + jobId, + bucketCount, + minBucketProcessingTimeMs, + maxBucketProcessingTimeMs, + avgBucketProcessingTimeMs, + exponentialAvgBucketProcessingTimeMs, + context); + } + + private static Bucket createBucket(long processingTimeMs) { + Bucket bucket = new Bucket(JOB_ID, Date.from(TIMESTAMP), BUCKET_SPAN.getSeconds()); + bucket.setProcessingTimeMs(processingTimeMs); + return bucket; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index 1e0263185798f..586f3c039e21b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -134,7 +134,7 @@ public void testProcessResult_bucket() { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); - Bucket bucket = mock(Bucket.class); + Bucket bucket = new Bucket(JOB_ID, new Date(), BUCKET_SPAN_MS); when(result.getBucket()).thenReturn(bucket); processorUnderTest.setDeleteInterimRequired(false); @@ -151,7 +151,7 @@ public void testProcessResult_bucket_deleteInterimRequired() { when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutodetectResult result = mock(AutodetectResult.class); - Bucket bucket = mock(Bucket.class); + Bucket bucket = new Bucket(JOB_ID, new Date(), BUCKET_SPAN_MS); when(result.getBucket()).thenReturn(bucket); processorUnderTest.processResult(result); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java index 616dd6cba5a14..2740f0ec01cb9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java @@ -7,6 +7,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import static org.hamcrest.Matchers.equalTo; @@ -15,13 +16,17 @@ public class AutodetectParamsTests extends ESTestCase { private static final String JOB_ID = "my-job"; public void testBuilder_WithTimingStats() { - TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0); + TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0, new ExponentialAverageCalculationContext()); AutodetectParams params = new AutodetectParams.Builder(JOB_ID).setTimingStats(timingStats).build(); assertThat(params.timingStats(), equalTo(timingStats)); timingStats.updateStats(2000.0); - assertThat(timingStats, equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75, 1010.0))); - assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0))); + assertThat( + timingStats, + equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75, 1010.0, new ExponentialAverageCalculationContext(2000.0, null, null)))); + assertThat( + params.timingStats(), + equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0, new ExponentialAverageCalculationContext()))); } public void testBuilder_WithoutTimingStats() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java index aa56e392de2bb..9cab6c725a71d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java @@ -27,13 +27,15 @@ public class BucketTests extends AbstractSerializingTestCase { + private static final long MAX_BUCKET_SPAN_SEC = 100_000_000_000L; // bucket span of > 3000 years should be enough for everyone + @Override public Bucket createTestInstance() { return createTestInstance("foo"); } public Bucket createTestInstance(String jobId) { - Bucket bucket = new Bucket(jobId, randomDate(), randomNonNegativeLong()); + Bucket bucket = new Bucket(jobId, randomDate(), randomLongBetween(1, MAX_BUCKET_SPAN_SEC)); if (randomBoolean()) { bucket.setAnomalyScore(randomDouble()); } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java index ebfbf8d223db9..23de6258b3fe0 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.utils.ExponentialAverageCalculationContext; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase; @@ -103,7 +104,8 @@ public void testToXContent() throws IOException { final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7); final ForecastStats forecastStats = new ForecastStats(); - final TimingStats timingStats = new TimingStats("_job_id", 100, 10.0, 30.0, 20.0, 25.0); + final TimingStats timingStats = new TimingStats( + "_job_id", 100, 10.0, 30.0, 20.0, 25.0, new ExponentialAverageCalculationContext(50.0, null, null)); final JobStats jobStats = new JobStats( "_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, "_explanation", time, timingStats); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L); @@ -179,7 +181,8 @@ public void testToXContent() throws IOException { + "\"minimum_bucket_processing_time_ms\":10.0," + "\"maximum_bucket_processing_time_ms\":30.0," + "\"average_bucket_processing_time_ms\":20.0," - + "\"exponential_average_bucket_processing_time_ms\":25.0" + + "\"exponential_average_bucket_processing_time_ms\":25.0," + + "\"exponential_average_bucket_processing_time_per_hour_ms\":50.0" + "}" + "}" + "}", xContent.utf8ToString());