Skip to content

Commit 99426eb

Browse files
authored
[ML] Extract persistent task methods from MlMetadata (#32319)
Move ML persistent task helper functions to the new class MlTasks and remove MLMetadataField after moving the string constant to MlMetadata.
1 parent 73a3889 commit 99426eb

40 files changed

+262
-213
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
5353
import org.elasticsearch.xpack.core.action.XPackInfoAction;
5454
import org.elasticsearch.xpack.core.action.XPackUsageAction;
55-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
55+
import org.elasticsearch.xpack.core.ml.MlMetadata;
5656
import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction;
5757
import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction;
5858
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
@@ -197,7 +197,7 @@ public static List<DiscoveryNode> nodesNotReadyForXPackCustomMetadata(ClusterSta
197197
private static boolean alreadyContainsXPackCustomMetadata(ClusterState clusterState) {
198198
final MetaData metaData = clusterState.metaData();
199199
return metaData.custom(LicensesMetaData.TYPE) != null ||
200-
metaData.custom(MLMetadataField.TYPE) != null ||
200+
metaData.custom(MlMetadata.TYPE) != null ||
201201
metaData.custom(WatcherMetaData.TYPE) != null ||
202202
clusterState.custom(TokenMetaData.TYPE) != null;
203203
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MLMetadataField.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
5757

58+
public static final String TYPE = "ml";
5859
private static final ParseField JOBS_FIELD = new ParseField("jobs");
5960
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
6061

@@ -119,7 +120,7 @@ public Version getMinimalSupportedVersion() {
119120

120121
@Override
121122
public String getWriteableName() {
122-
return MLMetadataField.TYPE;
123+
return TYPE;
123124
}
124125

125126
@Override
@@ -213,7 +214,7 @@ public void writeTo(StreamOutput out) throws IOException {
213214

214215
@Override
215216
public String getWriteableName() {
216-
return MLMetadataField.TYPE;
217+
return TYPE;
217218
}
218219

219220
static Diff<Job> readJobDiffFrom(StreamInput in) throws IOException {
@@ -277,7 +278,7 @@ public Builder putJob(Job job, boolean overwrite) {
277278
public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {
278279
checkJobHasNoDatafeed(jobId);
279280

280-
JobState jobState = MlMetadata.getJobState(jobId, tasks);
281+
JobState jobState = MlTasks.getJobState(jobId, tasks);
281282
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
282283
throw ExceptionsHelper.conflictStatusException("Unexpected job state [" + jobState + "], expected [" +
283284
JobState.CLOSED + " or " + JobState.FAILED + "]");
@@ -362,7 +363,7 @@ private Optional<DatafeedConfig> getDatafeedByJobId(String jobId) {
362363

363364
private void checkDatafeedIsStopped(Supplier<String> msg, String datafeedId, PersistentTasksCustomMetaData persistentTasks) {
364365
if (persistentTasks != null) {
365-
if (persistentTasks.getTask(MLMetadataField.datafeedTaskId(datafeedId)) != null) {
366+
if (persistentTasks.getTask(MlTasks.datafeedTaskId(datafeedId)) != null) {
366367
throw ExceptionsHelper.conflictStatusException(msg.get());
367368
}
368369
}
@@ -399,7 +400,7 @@ public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks,
399400
checkJobHasNoDatafeed(jobId);
400401

401402
if (allowDeleteOpenJob == false) {
402-
PersistentTask<?> jobTask = getJobTask(jobId, tasks);
403+
PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
403404
if (jobTask != null) {
404405
JobTaskState jobTaskState = (JobTaskState) jobTask.getState();
405406
throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is "
@@ -420,56 +421,10 @@ void checkJobHasNoDatafeed(String jobId) {
420421
}
421422
}
422423

423-
/**
424-
* Namespaces the task ids for jobs.
425-
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
426-
*/
427-
public static String jobTaskId(String jobId) {
428-
return "job-" + jobId;
429-
}
430-
431-
@Nullable
432-
public static PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
433-
if (tasks == null) {
434-
return null;
435-
}
436-
return tasks.getTask(jobTaskId(jobId));
437-
}
438-
439-
@Nullable
440-
public static PersistentTask<?> getDatafeedTask(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
441-
if (tasks == null) {
442-
return null;
443-
}
444-
return tasks.getTask(MLMetadataField.datafeedTaskId(datafeedId));
445-
}
446424

447-
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
448-
PersistentTask<?> task = getJobTask(jobId, tasks);
449-
if (task != null) {
450-
JobTaskState jobTaskState = (JobTaskState) task.getState();
451-
if (jobTaskState == null) {
452-
return JobState.OPENING;
453-
}
454-
return jobTaskState.getState();
455-
}
456-
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
457-
return JobState.CLOSED;
458-
}
459-
460-
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
461-
PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
462-
if (task != null && task.getState() != null) {
463-
return (DatafeedState) task.getState();
464-
} else {
465-
// If we haven't started a datafeed then there will be no persistent task,
466-
// which is the same as if the datafeed was't started
467-
return DatafeedState.STOPPED;
468-
}
469-
}
470425

471426
public static MlMetadata getMlMetadata(ClusterState state) {
472-
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE);
427+
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(TYPE);
473428
if (mlMetadata == null) {
474429
return EMPTY_METADATA;
475430
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.ml;
8+
9+
import org.elasticsearch.common.Nullable;
10+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
11+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
12+
import org.elasticsearch.xpack.core.ml.job.config.JobState;
13+
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
14+
15+
public final class MlTasks {
16+
17+
private MlTasks() {
18+
}
19+
20+
/**
21+
* Namespaces the task ids for jobs.
22+
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
23+
*/
24+
public static String jobTaskId(String jobId) {
25+
return "job-" + jobId;
26+
}
27+
28+
/**
29+
* Namespaces the task ids for datafeeds.
30+
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
31+
*/
32+
public static String datafeedTaskId(String datafeedId) {
33+
return "datafeed-" + datafeedId;
34+
}
35+
36+
@Nullable
37+
public static PersistentTasksCustomMetaData.PersistentTask<?> getJobTask(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
38+
return tasks == null ? null : tasks.getTask(jobTaskId(jobId));
39+
}
40+
41+
@Nullable
42+
public static PersistentTasksCustomMetaData.PersistentTask<?> getDatafeedTask(String datafeedId,
43+
@Nullable PersistentTasksCustomMetaData tasks) {
44+
return tasks == null ? null : tasks.getTask(datafeedTaskId(datafeedId));
45+
}
46+
47+
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
48+
PersistentTasksCustomMetaData.PersistentTask<?> task = getJobTask(jobId, tasks);
49+
if (task != null) {
50+
JobTaskState jobTaskState = (JobTaskState) task.getState();
51+
if (jobTaskState == null) {
52+
return JobState.OPENING;
53+
}
54+
return jobTaskState.getState();
55+
}
56+
// If we haven't opened a job than there will be no persistent task, which is the same as if the job was closed
57+
return JobState.CLOSED;
58+
}
59+
60+
public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
61+
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
62+
if (task != null && task.getState() != null) {
63+
return (DatafeedState) task.getState();
64+
} else {
65+
// If we haven't started a datafeed then there will be no persistent task,
66+
// which is the same as if the datafeed was't started
67+
return DatafeedState.STOPPED;
68+
}
69+
}
70+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.elasticsearch.common.xcontent.XContentBuilder;
2020
import org.elasticsearch.common.xcontent.XContentParser;
2121
import org.elasticsearch.tasks.Task;
22-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
22+
import org.elasticsearch.xpack.core.ml.MlTasks;
2323
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2424
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2525

@@ -84,11 +84,8 @@ public String getDatafeedId() {
8484

8585
@Override
8686
public boolean match(Task task) {
87-
String expectedDescription = MLMetadataField.datafeedTaskId(datafeedId);
88-
if (task instanceof StartDatafeedAction.DatafeedTaskMatcher && expectedDescription.equals(task.getDescription())){
89-
return true;
90-
}
91-
return false;
87+
String expectedDescription = MlTasks.datafeedTaskId(datafeedId);
88+
return task instanceof StartDatafeedAction.DatafeedTaskMatcher && expectedDescription.equals(task.getDescription());
9289
}
9390

9491
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.elasticsearch.common.xcontent.XContentBuilder;
2323
import org.elasticsearch.common.xcontent.XContentParser;
2424
import org.elasticsearch.tasks.Task;
25-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
25+
import org.elasticsearch.xpack.core.ml.MlTasks;
2626
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2727
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2828

@@ -125,7 +125,7 @@ public void setAllowNoDatafeeds(boolean allowNoDatafeeds) {
125125
@Override
126126
public boolean match(Task task) {
127127
for (String id : resolvedStartedDatafeedIds) {
128-
String expectedDescription = MLMetadataField.datafeedTaskId(id);
128+
String expectedDescription = MlTasks.datafeedTaskId(id);
129129
if (task instanceof StartDatafeedAction.DatafeedTaskMatcher && expectedDescription.equals(task.getDescription())){
130130
return true;
131131
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.threadpool.ThreadPool;
2828
import org.elasticsearch.transport.TransportService;
2929
import org.elasticsearch.xpack.core.ml.MlMetadata;
30+
import org.elasticsearch.xpack.core.ml.MlTasks;
3031
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
3132
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
3233
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -121,7 +122,7 @@ static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState
121122
private static void addJobAccordingToState(String jobId, PersistentTasksCustomMetaData tasksMetaData,
122123
List<String> openJobs, List<String> closingJobs, List<String> failedJobs) {
123124

124-
JobState jobState = MlMetadata.getJobState(jobId, tasksMetaData);
125+
JobState jobState = MlTasks.getJobState(jobId, tasksMetaData);
125126
switch (jobState) {
126127
case CLOSING:
127128
closingJobs.add(jobId);
@@ -143,15 +144,15 @@ static TransportCloseJobAction.WaitForCloseRequest buildWaitForCloseRequest(List
143144
TransportCloseJobAction.WaitForCloseRequest waitForCloseRequest = new TransportCloseJobAction.WaitForCloseRequest();
144145

145146
for (String jobId : openJobIds) {
146-
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
147+
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
147148
if (jobTask != null) {
148149
auditor.info(jobId, Messages.JOB_AUDIT_CLOSING);
149150
waitForCloseRequest.persistentTaskIds.add(jobTask.getId());
150151
waitForCloseRequest.jobsToFinalize.add(jobId);
151152
}
152153
}
153154
for (String jobId : closingJobIds) {
154-
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
155+
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
155156
if (jobTask != null) {
156157
waitForCloseRequest.persistentTaskIds.add(jobTask.getId());
157158
}
@@ -180,7 +181,7 @@ static void validateJobAndTaskState(String jobId, MlMetadata mlMetadata, Persist
180181

181182
Optional<DatafeedConfig> datafeed = mlMetadata.getDatafeedByJobId(jobId);
182183
if (datafeed.isPresent()) {
183-
DatafeedState datafeedState = MlMetadata.getDatafeedState(datafeed.get().getId(), tasks);
184+
DatafeedState datafeedState = MlTasks.getDatafeedState(datafeed.get().getId(), tasks);
184185
if (datafeedState != DatafeedState.STOPPED) {
185186
throw ExceptionsHelper.conflictStatusException("cannot close job [{}], datafeed hasn't been stopped", jobId);
186187
}
@@ -230,7 +231,7 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
230231
Set<String> executorNodes = new HashSet<>();
231232
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
232233
for (String resolvedJobId : request.getOpenJobIds()) {
233-
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(resolvedJobId, tasks);
234+
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(resolvedJobId, tasks);
234235
if (jobTask == null || jobTask.isAssigned() == false) {
235236
String message = "Cannot close job [" + resolvedJobId + "] because the job does not have an assigned node." +
236237
" Use force close to close the job";
@@ -312,7 +313,7 @@ private void forceCloseJob(ClusterState currentState, CloseJobAction.Request req
312313
final AtomicArray<Exception> failures = new AtomicArray<>(numberOfJobs);
313314

314315
for (String jobId : jobIdsToForceClose) {
315-
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
316+
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
316317
if (jobTask != null) {
317318
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
318319
persistentTasksService.sendRemoveRequest(jobTask.getId(),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.elasticsearch.threadpool.ThreadPool;
2323
import org.elasticsearch.transport.TransportService;
2424
import org.elasticsearch.xpack.core.XPackPlugin;
25-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
2625
import org.elasticsearch.xpack.core.ml.MlMetadata;
26+
import org.elasticsearch.xpack.core.ml.MlTasks;
2727
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
2828
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
2929
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@@ -86,7 +86,7 @@ private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterSt
8686

8787
private void removeDatafeedTask(DeleteDatafeedAction.Request request, ClusterState state, ActionListener<Boolean> listener) {
8888
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
89-
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlMetadata.getDatafeedTask(request.getDatafeedId(), tasks);
89+
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(request.getDatafeedId(), tasks);
9090
if (datafeedTask == null) {
9191
listener.onResponse(true);
9292
} else {
@@ -128,7 +128,7 @@ public ClusterState execute(ClusterState currentState) {
128128
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
129129
.removeDatafeed(request.getDatafeedId(), persistentTasks).build();
130130
return ClusterState.builder(currentState).metaData(
131-
MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, newMetadata).build())
131+
MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMetadata).build())
132132
.build();
133133
}
134134
});

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import org.elasticsearch.tasks.Task;
2727
import org.elasticsearch.threadpool.ThreadPool;
2828
import org.elasticsearch.transport.TransportService;
29-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
3029
import org.elasticsearch.xpack.core.ml.MachineLearningField;
3130
import org.elasticsearch.xpack.core.ml.MlMetadata;
31+
import org.elasticsearch.xpack.core.ml.MlTasks;
3232
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
3333
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
3434
import org.elasticsearch.xpack.core.ml.job.persistence.JobStorageDeletionTask;
@@ -177,7 +177,7 @@ private void removePersistentTask(String jobId, ClusterState currentState,
177177
ActionListener<Boolean> listener) {
178178
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
179179

180-
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
180+
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(jobId, tasks);
181181
if (jobTask == null) {
182182
listener.onResponse(null);
183183
} else {
@@ -251,7 +251,7 @@ static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
251251

252252
private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
253253
ClusterState.Builder newState = ClusterState.builder(currentState);
254-
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MLMetadataField.TYPE, builder.build()).build());
254+
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build());
255255
return newState.build();
256256
}
257257
}

0 commit comments

Comments
 (0)