Skip to content

Commit b34eef1

Browse files
committed
[ML] Allocate jobs based on JobParams rather than cluster state config (elastic#33994)
1 parent 92979f4 commit b34eef1

File tree

21 files changed

+383
-371
lines changed

21 files changed

+383
-371
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.xpack.core.beats.BeatsFeatureSetUsage;
4747
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
4848
import org.elasticsearch.xpack.core.ml.MlMetadata;
49+
import org.elasticsearch.xpack.core.ml.MlTasks;
4950
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
5051
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
5152
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
@@ -334,9 +335,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
334335
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
335336
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
336337
// ML - Persistent action requests
337-
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
338+
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
338339
StartDatafeedAction.DatafeedParams::new),
339-
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
340+
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
340341
OpenJobAction.JobParams::new),
341342
// ML - Task states
342343
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
@@ -384,9 +385,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
384385
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
385386
parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()),
386387
// ML - Persistent action requests
387-
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
388+
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME),
388389
StartDatafeedAction.DatafeedParams::fromXContent),
389-
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
390+
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME),
390391
OpenJobAction.JobParams::fromXContent),
391392
// ML - Task states
392393
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1313
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1414

15-
import java.util.Collection;
15+
import java.util.List;
1616
import java.util.Set;
1717
import java.util.stream.Collectors;
1818

1919
public final class MlTasks {
2020

21-
public static final String JOB_TASK_PREFIX = "job-";
22-
public static final String DATAFEED_TASK_PREFIX = "datafeed-";
21+
public static final String JOB_TASK_NAME = "xpack/ml/job";
22+
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";
23+
24+
private static final String JOB_TASK_ID_PREFIX = "job-";
25+
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";
2326

2427
private MlTasks() {
2528
}
@@ -29,15 +32,15 @@ private MlTasks() {
2932
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
3033
*/
3134
public static String jobTaskId(String jobId) {
32-
return JOB_TASK_PREFIX + jobId;
35+
return JOB_TASK_ID_PREFIX + jobId;
3336
}
3437

3538
/**
3639
* Namespaces the task ids for datafeeds.
3740
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
3841
*/
3942
public static String datafeedTaskId(String datafeedId) {
40-
return DATAFEED_TASK_PREFIX + datafeedId;
43+
return DATAFEED_TASK_ID_PREFIX + datafeedId;
4144
}
4245

4346
@Nullable
@@ -76,15 +79,41 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
7679
}
7780

7881
/**
79-
* The job Ids of anomaly detector job tasks
80-
* @param tasks Active tasks
82+
* The job Ids of anomaly detector job tasks.
83+
* All anomaly detector jobs are returned regardless of the status of the
84+
* task (OPEN, CLOSED, FAILED etc).
85+
*
86+
* @param tasks Persistent tasks
8187
* @return The job Ids of anomaly detector job tasks
8288
*/
8389
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
84-
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> activeTasks = tasks.tasks();
85-
86-
return activeTasks.stream().filter(t -> t.getId().startsWith(JOB_TASK_PREFIX))
87-
.map(t -> t.getId().substring(JOB_TASK_PREFIX.length()))
90+
return tasks.findTasks(JOB_TASK_NAME, task -> true)
91+
.stream()
92+
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
8893
.collect(Collectors.toSet());
8994
}
95+
96+
/**
97+
* Is there an ml anomaly detector job task for the job {@code jobId}?
98+
* @param jobId The job id
99+
* @param tasks Persistent tasks
100+
* @return
101+
*/
102+
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
103+
return openJobIds(tasks).contains(jobId);
104+
}
105+
106+
/**
107+
* Read the active anomaly detector job tasks.
108+
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
109+
*
110+
* @param tasks Persistent tasks
111+
* @return The job tasks excluding closed and failed jobs
112+
*/
113+
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
114+
return tasks.findTasks(JOB_TASK_NAME, task -> true)
115+
.stream()
116+
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
117+
.collect(Collectors.toList());
118+
}
90119
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.support.master.MasterNodeRequest;
1414
import org.elasticsearch.client.ElasticsearchClient;
1515
import org.elasticsearch.cluster.metadata.MetaData;
16+
import org.elasticsearch.common.Nullable;
1617
import org.elasticsearch.common.ParseField;
1718
import org.elasticsearch.common.Strings;
1819
import org.elasticsearch.common.io.stream.StreamInput;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.tasks.Task;
2728
import org.elasticsearch.xpack.core.XPackPlugin;
2829
import org.elasticsearch.xpack.core.ml.MachineLearningField;
30+
import org.elasticsearch.xpack.core.ml.MlTasks;
2931
import org.elasticsearch.xpack.core.ml.job.config.Job;
3032
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3133

@@ -36,7 +38,7 @@ public class OpenJobAction extends Action<AcknowledgedResponse> {
3638

3739
public static final OpenJobAction INSTANCE = new OpenJobAction();
3840
public static final String NAME = "cluster:admin/xpack/ml/job/open";
39-
public static final String TASK_NAME = "xpack/ml/job";
41+
4042

4143
private OpenJobAction() {
4244
super(NAME);
@@ -132,10 +134,9 @@ public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {
132134

133135
/** TODO Remove in 7.0.0 */
134136
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
135-
136137
public static final ParseField TIMEOUT = new ParseField("timeout");
137-
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, JobParams::new);
138138

139+
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, true, JobParams::new);
139140
static {
140141
PARSER.declareString(JobParams::setJobId, Job.ID);
141142
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
@@ -159,6 +160,7 @@ public static JobParams parseRequest(String jobId, XContentParser parser) {
159160
// A big state can take a while to restore. For symmetry with the _close endpoint any
160161
// changes here should be reflected there too.
161162
private TimeValue timeout = MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT;
163+
private Job job;
162164

163165
JobParams() {
164166
}
@@ -170,6 +172,9 @@ public JobParams(String jobId) {
170172
public JobParams(StreamInput in) throws IOException {
171173
jobId = in.readString();
172174
timeout = TimeValue.timeValueMillis(in.readVLong());
175+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
176+
job = in.readOptionalWriteable(Job::new);
177+
}
173178
}
174179

175180
public String getJobId() {
@@ -188,15 +193,27 @@ public void setTimeout(TimeValue timeout) {
188193
this.timeout = timeout;
189194
}
190195

196+
@Nullable
197+
public Job getJob() {
198+
return job;
199+
}
200+
201+
public void setJob(Job job) {
202+
this.job = job;
203+
}
204+
191205
@Override
192206
public String getWriteableName() {
193-
return TASK_NAME;
207+
return MlTasks.JOB_TASK_NAME;
194208
}
195209

196210
@Override
197211
public void writeTo(StreamOutput out) throws IOException {
198212
out.writeString(jobId);
199213
out.writeVLong(timeout.millis());
214+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
215+
out.writeOptionalWriteable(job);
216+
}
200217
}
201218

202219
@Override
@@ -205,12 +222,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
205222
builder.field(Job.ID.getPreferredName(), jobId);
206223
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
207224
builder.endObject();
225+
// The job field is streamed but not persisted
208226
return builder;
209227
}
210228

211229
@Override
212230
public int hashCode() {
213-
return Objects.hash(jobId, timeout);
231+
return Objects.hash(jobId, timeout, job);
214232
}
215233

216234
@Override
@@ -223,7 +241,8 @@ public boolean equals(Object obj) {
223241
}
224242
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
225243
return Objects.equals(jobId, other.jobId) &&
226-
Objects.equals(timeout, other.timeout);
244+
Objects.equals(timeout, other.timeout) &&
245+
Objects.equals(job, other.job);
227246
}
228247

229248
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.xcontent.XContentParser;
2727
import org.elasticsearch.index.mapper.DateFieldMapper;
2828
import org.elasticsearch.xpack.core.XPackPlugin;
29+
import org.elasticsearch.xpack.core.ml.MlTasks;
2930
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
3031
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3132
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@@ -42,7 +43,6 @@ public class StartDatafeedAction extends Action<AcknowledgedResponse> {
4243

4344
public static final StartDatafeedAction INSTANCE = new StartDatafeedAction();
4445
public static final String NAME = "cluster:admin/xpack/ml/datafeed/start";
45-
public static final String TASK_NAME = "xpack/ml/datafeed";
4646

4747
private StartDatafeedAction() {
4848
super(NAME);
@@ -141,8 +141,7 @@ public boolean equals(Object obj) {
141141

142142
public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {
143143

144-
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, DatafeedParams::new);
145-
144+
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new);
146145
static {
147146
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
148147
PARSER.declareString((params, startTime) -> params.startTime = parseDateOrThrow(
@@ -229,7 +228,7 @@ public void setTimeout(TimeValue timeout) {
229228

230229
@Override
231230
public String getWriteableName() {
232-
return TASK_NAME;
231+
return MlTasks.DATAFEED_TASK_NAME;
233232
}
234233

235234
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import org.elasticsearch.common.xcontent.XContentBuilder;
1313
import org.elasticsearch.common.xcontent.XContentParser;
1414
import org.elasticsearch.persistent.PersistentTaskState;
15-
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
15+
import org.elasticsearch.xpack.core.ml.MlTasks;
1616

1717
import java.io.IOException;
1818
import java.util.Locale;
@@ -23,7 +23,7 @@ public enum DatafeedState implements PersistentTaskState {
2323

2424
STARTED, STOPPED, STARTING, STOPPING;
2525

26-
public static final String NAME = StartDatafeedAction.TASK_NAME;
26+
public static final String NAME = MlTasks.DATAFEED_TASK_NAME;
2727

2828
private static final ConstructingObjectParser<DatafeedState, Void> PARSER =
2929
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.elasticsearch.common.xcontent.XContentParser;
1515
import org.elasticsearch.persistent.PersistentTaskState;
1616
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
17-
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
17+
import org.elasticsearch.xpack.core.ml.MlTasks;
1818

1919
import java.io.IOException;
2020
import java.util.Objects;
@@ -23,7 +23,7 @@
2323

2424
public class JobTaskState implements PersistentTaskState {
2525

26-
public static final String NAME = OpenJobAction.TASK_NAME;
26+
public static final String NAME = MlTasks.JOB_TASK_NAME;
2727

2828
private static ParseField STATE = new ParseField("state");
2929
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");

0 commit comments

Comments
 (0)