Skip to content

Commit dd33193

Browse files
[ML] Parse and report memory usage for DF Analytics (#52778)
Adds reporting of memory usage for data frame analytics jobs. This commit introduces a new index pattern `.ml-stats-*` whose first concrete index will be `.ml-stats-000001`. This index serves to store instrumentation information for those jobs.
1 parent a095115 commit dd33193

File tree

29 files changed

+921
-218
lines changed

29 files changed

+921
-218
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStats.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
4444
static final ParseField STATE = new ParseField("state");
4545
static final ParseField FAILURE_REASON = new ParseField("failure_reason");
4646
static final ParseField PROGRESS = new ParseField("progress");
47+
static final ParseField MEMORY_USAGE = new ParseField("memory_usage");
4748
static final ParseField NODE = new ParseField("node");
4849
static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation");
4950

@@ -55,8 +56,9 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
5556
(DataFrameAnalyticsState) args[1],
5657
(String) args[2],
5758
(List<PhaseProgress>) args[3],
58-
(NodeAttributes) args[4],
59-
(String) args[5]));
59+
(MemoryUsage) args[4],
60+
(NodeAttributes) args[5],
61+
(String) args[6]));
6062

6163
static {
6264
PARSER.declareString(constructorArg(), ID);
@@ -68,6 +70,7 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
6870
}, STATE, ObjectParser.ValueType.STRING);
6971
PARSER.declareString(optionalConstructorArg(), FAILURE_REASON);
7072
PARSER.declareObjectArray(optionalConstructorArg(), PhaseProgress.PARSER, PROGRESS);
73+
PARSER.declareObject(optionalConstructorArg(), MemoryUsage.PARSER, MEMORY_USAGE);
7174
PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE);
7275
PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
7376
}
@@ -76,16 +79,18 @@ public static DataFrameAnalyticsStats fromXContent(XContentParser parser) throws
7679
private final DataFrameAnalyticsState state;
7780
private final String failureReason;
7881
private final List<PhaseProgress> progress;
82+
private final MemoryUsage memoryUsage;
7983
private final NodeAttributes node;
8084
private final String assignmentExplanation;
8185

8286
public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason,
83-
@Nullable List<PhaseProgress> progress, @Nullable NodeAttributes node,
84-
@Nullable String assignmentExplanation) {
87+
@Nullable List<PhaseProgress> progress, @Nullable MemoryUsage memoryUsage,
88+
@Nullable NodeAttributes node, @Nullable String assignmentExplanation) {
8589
this.id = id;
8690
this.state = state;
8791
this.failureReason = failureReason;
8892
this.progress = progress;
93+
this.memoryUsage = memoryUsage;
8994
this.node = node;
9095
this.assignmentExplanation = assignmentExplanation;
9196
}
@@ -106,6 +111,11 @@ public List<PhaseProgress> getProgress() {
106111
return progress;
107112
}
108113

114+
@Nullable
115+
public MemoryUsage getMemoryUsage() {
116+
return memoryUsage;
117+
}
118+
109119
public NodeAttributes getNode() {
110120
return node;
111121
}
@@ -124,13 +134,14 @@ public boolean equals(Object o) {
124134
&& Objects.equals(state, other.state)
125135
&& Objects.equals(failureReason, other.failureReason)
126136
&& Objects.equals(progress, other.progress)
137+
&& Objects.equals(memoryUsage, other.memoryUsage)
127138
&& Objects.equals(node, other.node)
128139
&& Objects.equals(assignmentExplanation, other.assignmentExplanation);
129140
}
130141

131142
@Override
132143
public int hashCode() {
133-
return Objects.hash(id, state, failureReason, progress, node, assignmentExplanation);
144+
return Objects.hash(id, state, failureReason, progress, memoryUsage, node, assignmentExplanation);
134145
}
135146

136147
@Override
@@ -140,6 +151,7 @@ public String toString() {
140151
.add("state", state)
141152
.add("failureReason", failureReason)
142153
.add("progress", progress)
154+
.add("memoryUsage", memoryUsage)
143155
.add("node", node)
144156
.add("assignmentExplanation", assignmentExplanation)
145157
.toString();
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml.dataframe;
20+
21+
import org.elasticsearch.client.common.TimeUtil;
22+
import org.elasticsearch.common.ParseField;
23+
import org.elasticsearch.common.inject.internal.ToStringBuilder;
24+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
25+
import org.elasticsearch.common.xcontent.ObjectParser;
26+
import org.elasticsearch.common.xcontent.ToXContentObject;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
29+
import java.io.IOException;
30+
import java.time.Instant;
31+
import java.util.Objects;
32+
33+
public class MemoryUsage implements ToXContentObject {
34+
35+
static final ParseField TIMESTAMP = new ParseField("timestamp");
36+
static final ParseField PEAK_USAGE_BYTES = new ParseField("peak_usage_bytes");
37+
38+
public static final ConstructingObjectParser<MemoryUsage, Void> PARSER = new ConstructingObjectParser<>("analytics_memory_usage",
39+
true, a -> new MemoryUsage((Instant) a[0], (long) a[1]));
40+
41+
static {
42+
PARSER.declareField(ConstructingObjectParser.constructorArg(),
43+
p -> TimeUtil.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()),
44+
TIMESTAMP,
45+
ObjectParser.ValueType.VALUE);
46+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES);
47+
}
48+
49+
private final Instant timestamp;
50+
private final long peakUsageBytes;
51+
52+
public MemoryUsage(Instant timestamp, long peakUsageBytes) {
53+
this.timestamp = Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli());
54+
this.peakUsageBytes = peakUsageBytes;
55+
}
56+
57+
@Override
58+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
59+
builder.startObject();
60+
builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
61+
builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes);
62+
builder.endObject();
63+
return builder;
64+
}
65+
66+
@Override
67+
public boolean equals(Object o) {
68+
if (o == this) return true;
69+
if (o == null || getClass() != o.getClass()) return false;
70+
71+
MemoryUsage other = (MemoryUsage) o;
72+
return Objects.equals(timestamp, other.timestamp)
73+
&& peakUsageBytes == other.peakUsageBytes;
74+
}
75+
76+
@Override
77+
public int hashCode() {
78+
return Objects.hash(timestamp, peakUsageBytes);
79+
}
80+
81+
@Override
82+
public String toString() {
83+
return new ToStringBuilder(getClass())
84+
.add(TIMESTAMP.getPreferredName(), timestamp.getEpochSecond())
85+
.add(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes)
86+
.toString();
87+
}
88+
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,6 +1506,7 @@ public void testGetDataFrameAnalyticsStats() throws Exception {
15061506
assertThat(progress.get(1), equalTo(new PhaseProgress("loading_data", 0)));
15071507
assertThat(progress.get(2), equalTo(new PhaseProgress("analyzing", 0)));
15081508
assertThat(progress.get(3), equalTo(new PhaseProgress("writing_results", 0)));
1509+
assertThat(stats.getMemoryUsage(), is(nullValue()));
15091510
}
15101511

15111512
public void testStartDataFrameAnalyticsConfig() throws Exception {

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/dataframe/DataFrameAnalyticsStatsTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public static DataFrameAnalyticsStats randomDataFrameAnalyticsStats() {
4747
randomFrom(DataFrameAnalyticsState.values()),
4848
randomBoolean() ? null : randomAlphaOfLength(10),
4949
randomBoolean() ? null : createRandomProgress(),
50+
randomBoolean() ? null : MemoryUsageTests.createRandom(),
5051
randomBoolean() ? null : NodeAttributesTests.createRandom(),
5152
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20));
5253
}
@@ -70,6 +71,9 @@ public static void toXContent(DataFrameAnalyticsStats stats, XContentBuilder bui
7071
if (stats.getProgress() != null) {
7172
builder.field(DataFrameAnalyticsStats.PROGRESS.getPreferredName(), stats.getProgress());
7273
}
74+
if (stats.getMemoryUsage() != null) {
75+
builder.field(DataFrameAnalyticsStats.MEMORY_USAGE.getPreferredName(), stats.getMemoryUsage());
76+
}
7377
if (stats.getNode() != null) {
7478
builder.field(DataFrameAnalyticsStats.NODE.getPreferredName(), stats.getNode());
7579
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml.dataframe;
20+
21+
import org.elasticsearch.common.xcontent.XContentParser;
22+
import org.elasticsearch.test.AbstractXContentTestCase;
23+
24+
import java.io.IOException;
25+
import java.time.Instant;
26+
27+
public class MemoryUsageTests extends AbstractXContentTestCase<MemoryUsage> {
28+
29+
@Override
30+
protected MemoryUsage createTestInstance() {
31+
return createRandom();
32+
}
33+
34+
public static MemoryUsage createRandom() {
35+
return new MemoryUsage(Instant.now(), randomNonNegativeLong());
36+
}
37+
38+
@Override
39+
protected MemoryUsage doParseInstance(XContentParser parser) throws IOException {
40+
return MemoryUsage.PARSER.apply(parser, null);
41+
}
42+
43+
@Override
44+
protected boolean supportsUnknownFields() {
45+
return true;
46+
}
47+
}

docs/reference/ml/ml-shared.asciidoc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,13 +451,25 @@ sorted by the `id` value in ascending order.
451451
`progress`:::
452452
(array) The progress report of the {dfanalytics-job} by phase.
453453

454-
`phase`:::
454+
`phase`::::
455455
(string) Defines the phase of the {dfanalytics-job}. Possible phases:
456456
`reindexing`, `loading_data`, `analyzing`, and `writing_results`.
457457

458-
`progress_percent`:::
458+
`progress_percent`::::
459459
(integer) The progress that the {dfanalytics-job} has made expressed in
460460
percentage.
461+
462+
`memory_usage`:::
463+
(Optional, Object) An object describing memory usage of the analytics.
464+
It will be present only after the job has started and memory usage has
465+
been reported.
466+
467+
`timestamp`::::
468+
(date) The timestamp when memory usage was calculated.
469+
470+
`peak_usage_bytes`::::
471+
(long) The number of bytes used at the highest peak of memory usage.
472+
461473
end::data-frame-analytics-stats[]
462474

463475
tag::datafeed-id[]

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/time/TimeUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ private TimeUtils() {
2222
// Do nothing
2323
}
2424

25+
/**
26+
* @deprecated Please use {@link #parseTimeFieldToInstant(XContentParser, String)} instead.
27+
*/
28+
@Deprecated
2529
public static Date parseTimeField(XContentParser parser, String fieldName) throws IOException {
2630
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
2731
return new Date(parser.longValue());
@@ -36,7 +40,7 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel
3640
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
3741
return Instant.ofEpochMilli(parser.longValue());
3842
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
39-
return Instant.ofEpochMilli(dateStringToEpoch(parser.text()));
43+
return Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
4044
}
4145
throw new IllegalArgumentException(
4246
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
@@ -54,6 +58,7 @@ public static Instant parseTimeFieldToInstant(XContentParser parser, String fiel
5458
* @return The epoch time in milliseconds or -1 if the date cannot be
5559
* parsed.
5660
*/
61+
@Deprecated
5762
public static long dateStringToEpoch(String date) {
5863
try {
5964
long epoch = Long.parseLong(date);
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ml;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
13+
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
14+
import org.elasticsearch.xpack.core.template.TemplateUtils;
15+
16+
/**
17+
* Describes the indices where ML is storing various stats about the users jobs.
18+
*/
19+
public class MlStatsIndex {
20+
21+
public static final String TEMPLATE_NAME = ".ml-stats";
22+
23+
private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
24+
25+
private MlStatsIndex() {}
26+
27+
public static String mapping() {
28+
return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/stats_index_mappings.json",
29+
Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE);
30+
}
31+
32+
public static String indexPattern() {
33+
return TEMPLATE_NAME + "-*";
34+
}
35+
36+
public static String writeAlias() {
37+
return ".ml-stats-write";
38+
}
39+
40+
/**
41+
* Creates the first concrete .ml-stats-000001 index (if necessary)
42+
* Creates the .ml-stats-write alias for that index.
43+
* The listener will be notified with a boolean to indicate if the index was created because of this call,
44+
* but unless there is a failure after this method returns the index and alias should be present.
45+
*/
46+
public static void createStatsIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver,
47+
ActionListener<Boolean> listener) {
48+
MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver, TEMPLATE_NAME, writeAlias(), listener);
49+
}
50+
}

0 commit comments

Comments
 (0)