Skip to content

Commit 057cdff

Browse files
[ML] Refactor DataStreamDiagnostics to use array (#30129)
This commit refactors the DataStreamDiagnostics class achieving the following advantages: - simpler code; by encapsulating the moving bucket histogram into its own class - better performance; by using an array to store the buckets instead of a map - explicit handling of gap buckets; in preparation of fixing #30080
1 parent acdf330 commit 057cdff

File tree

5 files changed

+319
-230
lines changed

5 files changed

+319
-230
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
import org.elasticsearch.common.settings.Settings;
1313
import org.elasticsearch.common.unit.TimeValue;
1414
import org.elasticsearch.xpack.core.ml.job.config.Job;
15-
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
1615
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
16+
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
17+
import org.elasticsearch.xpack.ml.job.process.diagnostics.DataStreamDiagnostics;
1718

1819
import java.util.Date;
1920
import java.util.Locale;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java

Lines changed: 0 additions & 223 deletions
This file was deleted.
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.job.process.diagnostics;
7+
8+
import org.elasticsearch.xpack.core.ml.job.config.Job;
9+
import org.elasticsearch.xpack.core.ml.utils.Intervals;
10+
11+
/**
12+
* A moving window of buckets that allow keeping
13+
* track of some statistics like the bucket count,
14+
* empty or sparse buckets, etc.
15+
*
16+
* The counts are stored in an array that functions as a
17+
* circular buffer. When time is advanced, all buckets
18+
* out of the window are flushed.
19+
*/
20+
class BucketDiagnostics {
21+
22+
private static final int MIN_BUCKETS = 10;
23+
24+
private final long bucketSpanMs;
25+
private final long latencyMs;
26+
private final int maxSize;
27+
private final long[] buckets;
28+
private long movingBucketCount = 0;
29+
private long latestBucketStartMs = -1;
30+
private int latestBucketIndex;
31+
private long earliestBucketStartMs = -1;
32+
private int earliestBucketIndex;
33+
private long latestFlushedBucketStartMs = -1;
34+
private final BucketFlushListener bucketFlushListener;
35+
36+
BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) {
37+
bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis();
38+
latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis();
39+
maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS);
40+
buckets = new long[maxSize];
41+
this.bucketFlushListener = bucketFlushListener;
42+
}
43+
44+
void addRecord(long recordTimestampMs) {
45+
long bucketStartMs = Intervals.alignToFloor(recordTimestampMs, bucketSpanMs);
46+
47+
// Initialize earliest/latest times
48+
if (latestBucketStartMs < 0) {
49+
latestBucketStartMs = bucketStartMs;
50+
earliestBucketStartMs = bucketStartMs;
51+
}
52+
53+
advanceTime(bucketStartMs);
54+
addToBucket(bucketStartMs);
55+
}
56+
57+
private void advanceTime(long bucketStartMs) {
58+
while (bucketStartMs > latestBucketStartMs) {
59+
int flushBucketIndex = (latestBucketIndex + 1) % maxSize;
60+
61+
if (flushBucketIndex == earliestBucketIndex) {
62+
flush(flushBucketIndex);
63+
movingBucketCount -= buckets[flushBucketIndex];
64+
earliestBucketStartMs += bucketSpanMs;
65+
earliestBucketIndex = (earliestBucketIndex + 1) % maxSize;
66+
}
67+
buckets[flushBucketIndex] = 0L;
68+
69+
latestBucketStartMs += bucketSpanMs;
70+
latestBucketIndex = flushBucketIndex;
71+
}
72+
}
73+
74+
private void addToBucket(long bucketStartMs) {
75+
int offsetToLatest = (int) ((bucketStartMs - latestBucketStartMs) / bucketSpanMs);
76+
int bucketIndex = (latestBucketIndex + offsetToLatest) % maxSize;
77+
if (bucketIndex < 0) {
78+
bucketIndex = maxSize + bucketIndex;
79+
}
80+
81+
++buckets[bucketIndex];
82+
++movingBucketCount;
83+
84+
if (bucketStartMs < earliestBucketStartMs) {
85+
earliestBucketStartMs = bucketStartMs;
86+
earliestBucketIndex = bucketIndex;
87+
}
88+
}
89+
90+
private void flush(int bucketIndex) {
91+
long bucketStartMs = getTimestampMs(bucketIndex);
92+
if (bucketStartMs > latestFlushedBucketStartMs) {
93+
bucketFlushListener.onBucketFlush(bucketStartMs, buckets[bucketIndex]);
94+
latestFlushedBucketStartMs = bucketStartMs;
95+
}
96+
}
97+
98+
private long getTimestampMs(int bucketIndex) {
99+
int offsetToLatest = latestBucketIndex - bucketIndex;
100+
if (offsetToLatest < 0) {
101+
offsetToLatest = maxSize + offsetToLatest;
102+
}
103+
return latestBucketStartMs - offsetToLatest * bucketSpanMs;
104+
}
105+
106+
void flush() {
107+
if (latestBucketStartMs < 0) {
108+
return;
109+
}
110+
111+
int bucketIndex = earliestBucketIndex;
112+
while (bucketIndex != latestBucketIndex) {
113+
flush(bucketIndex);
114+
bucketIndex = (bucketIndex + 1) % maxSize;
115+
}
116+
}
117+
118+
double averageBucketCount() {
119+
return (double) movingBucketCount / size();
120+
}
121+
122+
private int size() {
123+
if (latestBucketStartMs < 0) {
124+
return 0;
125+
}
126+
return (int) ((latestBucketStartMs - earliestBucketStartMs) / bucketSpanMs) + 1;
127+
}
128+
129+
interface BucketFlushListener {
130+
void onBucketFlush(long bucketStartMs, long bucketCounts);
131+
}
132+
}

0 commit comments

Comments
 (0)