Skip to content

Commit b10eb4c

Browse files
author
Hendrik Muhs
committed
change field to forecasted_jobs, always write it and fix chained merges
1 parent 1b93000 commit b10eb4c

File tree

2 files changed

+109
-20
lines changed

2 files changed

+109
-20
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,23 @@ public class ForecastStats implements ToXContentObject, Writeable {
2424

2525
public static class Fields {
2626
public static final String TOTAL = "total";
27-
public static final String JOBS = "jobs";
27+
public static final String FORECASTED_JOBS = "forecasted_jobs";
2828
public static final String MEMORY = "memory_bytes";
2929
public static final String RUNTIME = "processing_time_ms";
3030
public static final String RECORDS = "records";
3131
public static final String STATUSES = "status";
3232
}
3333

3434
private long total;
35-
private long jobsWithAtleastOneForecast;
35+
private long forecastedJobs;
3636
private StatsAccumulator memoryStats;
3737
private StatsAccumulator recordStats;
3838
private StatsAccumulator runtimeStats;
3939
private CountAccumulator statusCounts;
4040

4141
public ForecastStats() {
4242
this.total = 0;
43-
this.jobsWithAtleastOneForecast = 0;
43+
this.forecastedJobs = 0;
4444
this.memoryStats = new StatsAccumulator();
4545
this.recordStats = new StatsAccumulator();
4646
this.runtimeStats = new StatsAccumulator();
@@ -53,7 +53,7 @@ public ForecastStats() {
5353
public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats,
5454
CountAccumulator statusCounts) {
5555
this.total = total;
56-
this.jobsWithAtleastOneForecast = total > 0 ? 1 : 0;
56+
this.forecastedJobs = total > 0 ? 1 : 0;
5757
this.memoryStats = Objects.requireNonNull(memoryStats);
5858
this.recordStats = Objects.requireNonNull(recordStats);
5959
this.runtimeStats = Objects.requireNonNull(runtimeStats);
@@ -62,7 +62,7 @@ public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator
6262

6363
public ForecastStats(StreamInput in) throws IOException {
6464
this.total = in.readLong();
65-
this.jobsWithAtleastOneForecast = in.readLong();
65+
this.forecastedJobs = in.readLong();
6666
this.memoryStats = new StatsAccumulator(in);
6767
this.recordStats = new StatsAccumulator(in);
6868
this.runtimeStats = new StatsAccumulator(in);
@@ -74,9 +74,7 @@ public void merge(ForecastStats other) {
7474
return;
7575
}
7676
total += other.total;
77-
if (other.total > 0) {
78-
++jobsWithAtleastOneForecast;
79-
}
77+
forecastedJobs += other.forecastedJobs;
8078
memoryStats.merge(other.memoryStats);
8179
recordStats.merge(other.recordStats);
8280
runtimeStats.merge(other.runtimeStats);
@@ -92,38 +90,37 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
9290

9391
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
9492
builder.field(Fields.TOTAL, total);
93+
builder.field(Fields.FORECASTED_JOBS, forecastedJobs);
94+
9595
if (total > 0) {
9696
builder.field(Fields.MEMORY, memoryStats.asMap());
9797
builder.field(Fields.RECORDS, recordStats.asMap());
9898
builder.field(Fields.RUNTIME, runtimeStats.asMap());
9999
builder.field(Fields.STATUSES, statusCounts.asMap());
100100
}
101-
if (jobsWithAtleastOneForecast > 1) {
102-
builder.field(Fields.JOBS, jobsWithAtleastOneForecast);
103-
}
104101

105102
return builder;
106103
}
107104

108105
public Map<String, Object> asMap() {
109106
Map<String, Object> map = new HashMap<>();
110107
map.put(Fields.TOTAL, total);
108+
map.put(Fields.FORECASTED_JOBS, forecastedJobs);
109+
111110
if (total > 0) {
112111
map.put(Fields.MEMORY, memoryStats.asMap());
113112
map.put(Fields.RECORDS, recordStats.asMap());
114113
map.put(Fields.RUNTIME, runtimeStats.asMap());
115114
map.put(Fields.STATUSES, statusCounts.asMap());
116115
}
117-
if (jobsWithAtleastOneForecast > 1) {
118-
map.put(Fields.JOBS, jobsWithAtleastOneForecast);
119-
}
116+
120117
return map;
121118
}
122119

123120
@Override
124121
public void writeTo(StreamOutput out) throws IOException {
125122
out.writeLong(total);
126-
out.writeLong(jobsWithAtleastOneForecast);
123+
out.writeLong(forecastedJobs);
127124
memoryStats.writeTo(out);
128125
recordStats.writeTo(out);
129126
runtimeStats.writeTo(out);
@@ -132,7 +129,7 @@ public void writeTo(StreamOutput out) throws IOException {
132129

133130
@Override
134131
public int hashCode() {
135-
return Objects.hash(total, jobsWithAtleastOneForecast, memoryStats, recordStats, runtimeStats, statusCounts);
132+
return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts);
136133
}
137134

138135
@Override
@@ -146,7 +143,7 @@ public boolean equals(Object obj) {
146143
}
147144

148145
ForecastStats other = (ForecastStats) obj;
149-
return Objects.equals(total, other.total) && Objects.equals(jobsWithAtleastOneForecast, other.jobsWithAtleastOneForecast)
146+
return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs)
150147
&& Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats)
151148
&& Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts);
152149
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ public void testEmpty() throws IOException {
3131
XContentParser parser = createParser(builder);
3232
Map<String, Object> properties = parser.map();
3333
assertTrue(properties.containsKey(Fields.TOTAL));
34+
assertTrue(properties.containsKey(Fields.FORECASTED_JOBS));
3435
assertFalse(properties.containsKey(Fields.MEMORY));
3536
assertFalse(properties.containsKey(Fields.RECORDS));
3637
assertFalse(properties.containsKey(Fields.RUNTIME));
37-
assertFalse(properties.containsKey(Fields.JOBS));
3838
assertFalse(properties.containsKey(Fields.STATUSES));
3939
}
4040

@@ -82,7 +82,7 @@ public void testMerge() {
8282

8383
Map<String, Object> mergedStats = forecastStats.asMap();
8484

85-
assertEquals(2L, mergedStats.get(Fields.JOBS));
85+
assertEquals(2L, mergedStats.get(Fields.FORECASTED_JOBS));
8686
assertEquals(5L, mergedStats.get(Fields.TOTAL));
8787

8888
@SuppressWarnings("unchecked")
@@ -119,6 +119,98 @@ public void testMerge() {
119119
assertEquals(1, mergedCountStats.get("scheduled").longValue());
120120
}
121121

122+
public void testChainedMerge() {
123+
StatsAccumulator memoryStats = new StatsAccumulator();
124+
memoryStats.add(1000);
125+
memoryStats.add(45000);
126+
memoryStats.add(2300);
127+
StatsAccumulator recordStats = new StatsAccumulator();
128+
recordStats.add(10);
129+
recordStats.add(0);
130+
recordStats.add(20);
131+
StatsAccumulator runtimeStats = new StatsAccumulator();
132+
runtimeStats.add(0);
133+
runtimeStats.add(0);
134+
runtimeStats.add(10);
135+
CountAccumulator statusStats = new CountAccumulator();
136+
statusStats.add("finished", 2L);
137+
statusStats.add("failed", 5L);
138+
ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats);
139+
140+
StatsAccumulator memoryStats2 = new StatsAccumulator();
141+
memoryStats2.add(10);
142+
memoryStats2.add(30);
143+
StatsAccumulator recordStats2 = new StatsAccumulator();
144+
recordStats2.add(10);
145+
recordStats2.add(0);
146+
StatsAccumulator runtimeStats2 = new StatsAccumulator();
147+
runtimeStats2.add(96);
148+
runtimeStats2.add(0);
149+
CountAccumulator statusStats2 = new CountAccumulator();
150+
statusStats2.add("finished", 2L);
151+
statusStats2.add("scheduled", 1L);
152+
ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2);
153+
154+
StatsAccumulator memoryStats3 = new StatsAccumulator();
155+
memoryStats3.add(500);
156+
StatsAccumulator recordStats3 = new StatsAccumulator();
157+
recordStats3.add(50);
158+
StatsAccumulator runtimeStats3 = new StatsAccumulator();
159+
runtimeStats3.add(32);
160+
CountAccumulator statusStats3 = new CountAccumulator();
161+
statusStats3.add("finished", 1L);
162+
ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, statusStats3);
163+
164+
ForecastStats forecastStats4 = new ForecastStats();
165+
166+
// merge 4 into 3
167+
forecastStats3.merge(forecastStats4);
168+
169+
// merge 3 into 2
170+
forecastStats2.merge(forecastStats3);
171+
172+
// merger 2 into 1
173+
forecastStats.merge(forecastStats2);
174+
175+
Map<String, Object> mergedStats = forecastStats.asMap();
176+
177+
assertEquals(3L, mergedStats.get(Fields.FORECASTED_JOBS));
178+
assertEquals(6L, mergedStats.get(Fields.TOTAL));
179+
180+
@SuppressWarnings("unchecked")
181+
Map<String, Double> mergedMemoryStats = (Map<String, Double>) mergedStats.get(Fields.MEMORY);
182+
183+
assertTrue(mergedMemoryStats != null);
184+
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.AVG), equalTo(8140.0));
185+
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MAX), equalTo(45000.0));
186+
assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MIN), equalTo(10.0));
187+
188+
@SuppressWarnings("unchecked")
189+
Map<String, Double> mergedRecordStats = (Map<String, Double>) mergedStats.get(Fields.RECORDS);
190+
191+
assertTrue(mergedRecordStats != null);
192+
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.AVG), equalTo(15.0));
193+
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MAX), equalTo(50.0));
194+
assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0));
195+
196+
@SuppressWarnings("unchecked")
197+
Map<String, Double> mergedRuntimeStats = (Map<String, Double>) mergedStats.get(Fields.RUNTIME);
198+
199+
assertTrue(mergedRuntimeStats != null);
200+
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.AVG), equalTo(23.0));
201+
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MAX), equalTo(96.0));
202+
assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0));
203+
204+
@SuppressWarnings("unchecked")
205+
Map<String, Long> mergedCountStats = (Map<String, Long>) mergedStats.get(Fields.STATUSES);
206+
207+
assertTrue(mergedCountStats != null);
208+
assertEquals(3, mergedCountStats.size());
209+
assertEquals(5, mergedCountStats.get("finished").longValue());
210+
assertEquals(5, mergedCountStats.get("failed").longValue());
211+
assertEquals(1, mergedCountStats.get("scheduled").longValue());
212+
}
213+
122214
public void testUniqueCountOfJobs() {
123215
ForecastStats forecastStats = createForecastStats(5, 10);
124216
ForecastStats forecastStats2 = createForecastStats(2, 8);
@@ -131,7 +223,7 @@ public void testUniqueCountOfJobs() {
131223
forecastStats.merge(forecastStats4);
132224
forecastStats.merge(forecastStats5);
133225

134-
assertEquals(3L, forecastStats.asMap().get(Fields.JOBS));
226+
assertEquals(3L, forecastStats.asMap().get(Fields.FORECASTED_JOBS));
135227
}
136228

137229
@Override

0 commit comments

Comments
 (0)