Skip to content

Commit 2b6cd7a

Browse files
author
David Roberts
authored
[ML] Reimplement established model memory (#35263)
This is the 6.6/6.7 implementation of a master node service to keep track of the native process memory requirement of each ML job with an associated native process. The new ML memory tracker service works when the whole cluster is upgraded to at least version 6.6. For mixed version clusters the old mechanism of established model memory stored on the job in cluster state is used. This means that the old (and complex) code to keep established model memory up to date on the job object cannot yet be removed. When this change is forward ported to 7.0 the old way of keeping established model memory updated will be removed.
1 parent 3a0a5e7 commit 2b6cd7a

File tree

13 files changed

+853
-65
lines changed

13 files changed

+853
-65
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,24 +57,28 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
5757
public static final String TYPE = "ml";
5858
private static final ParseField JOBS_FIELD = new ParseField("jobs");
5959
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
60+
private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version");
6061

61-
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap());
62+
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null);
6263
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
6364
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);
6465

6566
static {
6667
LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD);
6768
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
6869
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
70+
LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD);
6971
}
7072

7173
private final SortedMap<String, Job> jobs;
7274
private final SortedMap<String, DatafeedConfig> datafeeds;
75+
private final Long lastMemoryRefreshVersion;
7376
private final GroupOrJobLookup groupOrJobLookup;
7477

75-
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds) {
78+
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, Long lastMemoryRefreshVersion) {
7679
this.jobs = Collections.unmodifiableSortedMap(jobs);
7780
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
81+
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
7882
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
7983
}
8084

@@ -112,6 +116,10 @@ public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds
112116
.expand(expression, allowNoDatafeeds);
113117
}
114118

119+
public Long getLastMemoryRefreshVersion() {
120+
return lastMemoryRefreshVersion;
121+
}
122+
115123
@Override
116124
public Version getMinimalSupportedVersion() {
117125
return Version.V_5_4_0;
@@ -145,14 +153,21 @@ public MlMetadata(StreamInput in) throws IOException {
145153
datafeeds.put(in.readString(), new DatafeedConfig(in));
146154
}
147155
this.datafeeds = datafeeds;
148-
156+
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
157+
lastMemoryRefreshVersion = in.readOptionalLong();
158+
} else {
159+
lastMemoryRefreshVersion = null;
160+
}
149161
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
150162
}
151163

152164
@Override
153165
public void writeTo(StreamOutput out) throws IOException {
154166
writeMap(jobs, out);
155167
writeMap(datafeeds, out);
168+
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
169+
out.writeOptionalLong(lastMemoryRefreshVersion);
170+
}
156171
}
157172

158173
private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
@@ -169,6 +184,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
169184
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
170185
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
171186
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
187+
if (lastMemoryRefreshVersion != null) {
188+
builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion);
189+
}
172190
return builder;
173191
}
174192

@@ -185,30 +203,46 @@ public static class MlMetadataDiff implements NamedDiff<MetaData.Custom> {
185203

186204
final Diff<Map<String, Job>> jobs;
187205
final Diff<Map<String, DatafeedConfig>> datafeeds;
206+
final Long lastMemoryRefreshVersion;
188207

189208
MlMetadataDiff(MlMetadata before, MlMetadata after) {
190209
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
191210
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
211+
this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion;
192212
}
193213

194214
public MlMetadataDiff(StreamInput in) throws IOException {
195215
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
196216
MlMetadataDiff::readJobDiffFrom);
197217
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
198-
MlMetadataDiff::readSchedulerDiffFrom);
218+
MlMetadataDiff::readDatafeedDiffFrom);
219+
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
220+
lastMemoryRefreshVersion = in.readOptionalLong();
221+
} else {
222+
lastMemoryRefreshVersion = null;
223+
}
199224
}
200225

226+
/**
227+
* Merge the diff with the ML metadata.
228+
* @param part The current ML metadata.
229+
* @return The new ML metadata.
230+
*/
201231
@Override
202232
public MetaData.Custom apply(MetaData.Custom part) {
203233
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
204234
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
205-
return new MlMetadata(newJobs, newDatafeeds);
235+
// lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value
236+
return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion);
206237
}
207238

208239
@Override
209240
public void writeTo(StreamOutput out) throws IOException {
210241
jobs.writeTo(out);
211242
datafeeds.writeTo(out);
243+
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
244+
out.writeOptionalLong(lastMemoryRefreshVersion);
245+
}
212246
}
213247

214248
@Override
@@ -220,7 +254,7 @@ static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
220254
return AbstractDiffable.readDiffFrom(Job::new, in);
221255
}
222256

223-
static Diff<DatafeedConfig> readSchedulerDiffFrom(StreamInput in) throws IOException {
257+
static Diff<DatafeedConfig> readDatafeedDiffFrom(StreamInput in) throws IOException {
224258
return AbstractDiffable.readDiffFrom(DatafeedConfig::new, in);
225259
}
226260
}
@@ -233,7 +267,8 @@ public boolean equals(Object o) {
233267
return false;
234268
MlMetadata that = (MlMetadata) o;
235269
return Objects.equals(jobs, that.jobs) &&
236-
Objects.equals(datafeeds, that.datafeeds);
270+
Objects.equals(datafeeds, that.datafeeds) &&
271+
Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion);
237272
}
238273

239274
@Override
@@ -243,13 +278,14 @@ public final String toString() {
243278

244279
@Override
245280
public int hashCode() {
246-
return Objects.hash(jobs, datafeeds);
281+
return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion);
247282
}
248283

249284
public static class Builder {
250285

251286
private TreeMap<String, Job> jobs;
252287
private TreeMap<String, DatafeedConfig> datafeeds;
288+
private Long lastMemoryRefreshVersion;
253289

254290
public Builder() {
255291
jobs = new TreeMap<>();
@@ -263,6 +299,7 @@ public Builder(@Nullable MlMetadata previous) {
263299
} else {
264300
jobs = new TreeMap<>(previous.jobs);
265301
datafeeds = new TreeMap<>(previous.datafeeds);
302+
lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion;
266303
}
267304
}
268305

@@ -382,8 +419,13 @@ private Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
382419
return this;
383420
}
384421

422+
public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) {
423+
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
424+
return this;
425+
}
426+
385427
public MlMetadata build() {
386-
return new MlMetadata(jobs, datafeeds);
428+
return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion);
387429
}
388430

389431
public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {
@@ -420,8 +462,6 @@ void checkJobHasNoDatafeed(String jobId) {
420462
}
421463
}
422464

423-
424-
425465
public static MlMetadata getMlMetadata(ClusterState state) {
426466
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
427467
if (mlMetadata == null) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
142142
private final Date createTime;
143143
private final Date finishedTime;
144144
private final Date lastDataTime;
145+
// TODO: Remove in 7.0
145146
private final Long establishedModelMemory;
146147
private final AnalysisConfig analysisConfig;
147148
private final AnalysisLimits analysisLimits;
@@ -439,6 +440,7 @@ public Collection<String> allInputFields() {
439440
* program code and stack.
440441
* @return an estimate of the memory requirement of this job, in bytes
441442
*/
443+
// TODO: remove this method in 7.0
442444
public long estimateMemoryFootprint() {
443445
if (establishedModelMemory != null && establishedModelMemory > 0) {
444446
return establishedModelMemory + PROCESS_MEMORY_OVERHEAD.getBytes();
@@ -658,6 +660,7 @@ public static class Builder implements Writeable, ToXContentObject {
658660
private Date createTime;
659661
private Date finishedTime;
660662
private Date lastDataTime;
663+
// TODO: remove in 7.0
661664
private Long establishedModelMemory;
662665
private ModelPlotConfig modelPlotConfig;
663666
private Long renormalizationWindowDays;
@@ -1102,10 +1105,6 @@ private void validateGroups() {
11021105
public Job build(Date createTime) {
11031106
setCreateTime(createTime);
11041107
setJobVersion(Version.CURRENT);
1105-
// TODO: Maybe we _could_ accept a value for this supplied at create time - it would
1106-
// mean cloned jobs that hadn't been edited much would start with an accurate expected size.
1107-
// But on the other hand it would mean jobs that were cloned and then completely changed
1108-
// would start with a size that was completely wrong.
11091108
setEstablishedModelMemory(null);
11101109
return build();
11111110
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ public void testEstimateMemoryFootprint_GivenNoLimitAndNotEstablished() {
561561
builder.setEstablishedModelMemory(0L);
562562
}
563563
assertEquals(ByteSizeUnit.MB.toBytes(AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB)
564-
+ Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint());
564+
+ Job.PROCESS_MEMORY_OVERHEAD.getBytes(), builder.build().estimateMemoryFootprint());
565565
}
566566

567567
public void testEarliestValidTimestamp_GivenEmptyDataCounts() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
182182
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
183183
import org.elasticsearch.xpack.ml.notifications.Auditor;
184+
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
184185
import org.elasticsearch.xpack.ml.process.NativeController;
185186
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
186187
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
@@ -278,6 +279,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
278279

279280
private final SetOnce<AutodetectProcessManager> autodetectProcessManager = new SetOnce<>();
280281
private final SetOnce<DatafeedManager> datafeedManager = new SetOnce<>();
282+
private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
281283

282284
public MachineLearning(Settings settings, Path configPath) {
283285
this.settings = settings;
@@ -420,6 +422,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
420422
this.datafeedManager.set(datafeedManager);
421423
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
422424
autodetectProcessManager);
425+
MlMemoryTracker memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider);
426+
this.memoryTracker.set(memoryTracker);
423427

424428
// This object's constructor attaches to the license state, so there's no need to retain another reference to it
425429
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
@@ -438,7 +442,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
438442
jobDataCountsPersister,
439443
datafeedManager,
440444
auditor,
441-
new MlAssignmentNotifier(auditor, clusterService)
445+
new MlAssignmentNotifier(auditor, clusterService),
446+
memoryTracker
442447
);
443448
}
444449

@@ -449,7 +454,8 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
449454
}
450455

451456
return Arrays.asList(
452-
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get()),
457+
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(),
458+
memoryTracker.get()),
453459
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, datafeedManager.get())
454460
);
455461
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
7070
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
7171
import org.elasticsearch.xpack.ml.notifications.Auditor;
72+
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
7273
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
7374

7475
import java.util.ArrayList;
@@ -94,6 +95,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
9495
private final JobResultsProvider jobResultsProvider;
9596
private final JobConfigProvider jobConfigProvider;
9697
private final DatafeedConfigProvider datafeedConfigProvider;
98+
private final MlMemoryTracker memoryTracker;
9799

98100
/**
99101
* A map of task listeners by job_id.
@@ -108,7 +110,8 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
108110
ThreadPool threadPool, ActionFilters actionFilters,
109111
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
110112
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
111-
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider) {
113+
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
114+
MlMemoryTracker memoryTracker) {
112115
super(settings, DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
113116
indexNameExpressionResolver, DeleteJobAction.Request::new);
114117
this.client = client;
@@ -117,6 +120,7 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
117120
this.jobResultsProvider = jobResultsProvider;
118121
this.jobConfigProvider = jobConfigProvider;
119122
this.datafeedConfigProvider = datafeedConfigProvider;
123+
this.memoryTracker = memoryTracker;
120124
this.listenersByJobId = new HashMap<>();
121125
}
122126

@@ -211,6 +215,9 @@ private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJ
211215
ActionListener<AcknowledgedResponse> listener) {
212216
String jobId = request.getJobId();
213217

218+
// We clean up the memory tracker on delete rather than close as close is not a master node action
219+
memoryTracker.removeJob(jobId);
220+
214221
// Step 4. When the job has been removed from the cluster state, return a response
215222
// -------
216223
CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {

0 commit comments

Comments
 (0)