Skip to content

Commit 3703fbb

Browse files
committed
[ML] Allocate jobs based on JobParams rather than cluster state config (#33994)
1 parent 07cceaf commit 3703fbb

File tree

21 files changed

+376
-367
lines changed

21 files changed

+376
-367
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.xpack.core.beats.BeatsFeatureSetUsage;
4646
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
4747
import org.elasticsearch.xpack.core.ml.MlMetadata;
48+
import org.elasticsearch.xpack.core.ml.MlTasks;
4849
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
4950
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
5051
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
@@ -334,9 +335,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
334335
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
335336
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
336337
// ML - Persistent action requests
337-
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
338+
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
338339
StartDatafeedAction.DatafeedParams::new),
339-
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
340+
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
340341
OpenJobAction.JobParams::new),
341342
// ML - Task states
342343
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
@@ -384,9 +385,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
384385
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
385386
parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()),
386387
// ML - Persistent action requests
387-
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
388+
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME),
388389
StartDatafeedAction.DatafeedParams::fromXContent),
389-
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
390+
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME),
390391
OpenJobAction.JobParams::fromXContent),
391392
// ML - Task states
392393
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
import org.elasticsearch.xpack.core.ml.job.config.JobState;
1313
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
1414

15-
import java.util.Collection;
15+
import java.util.List;
1616
import java.util.Set;
1717
import java.util.stream.Collectors;
1818

1919
public final class MlTasks {
2020

21-
public static final String JOB_TASK_PREFIX = "job-";
22-
public static final String DATAFEED_TASK_PREFIX = "datafeed-";
21+
public static final String JOB_TASK_NAME = "xpack/ml/job";
22+
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";
23+
24+
private static final String JOB_TASK_ID_PREFIX = "job-";
25+
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";
2326

2427
private MlTasks() {
2528
}
@@ -29,15 +32,15 @@ private MlTasks() {
2932
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
3033
*/
3134
public static String jobTaskId(String jobId) {
32-
return JOB_TASK_PREFIX + jobId;
35+
return JOB_TASK_ID_PREFIX + jobId;
3336
}
3437

3538
/**
3639
* Namespaces the task ids for datafeeds.
3740
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
3841
*/
3942
public static String datafeedTaskId(String datafeedId) {
40-
return DATAFEED_TASK_PREFIX + datafeedId;
43+
return DATAFEED_TASK_ID_PREFIX + datafeedId;
4144
}
4245

4346
@Nullable
@@ -76,15 +79,41 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
7679
}
7780

7881
/**
79-
* The job Ids of anomaly detector job tasks
80-
* @param tasks Active tasks
82+
* The job Ids of anomaly detector job tasks.
83+
* All anomaly detector jobs are returned regardless of the status of the
84+
* task (OPEN, CLOSED, FAILED etc).
85+
*
86+
* @param tasks Persistent tasks
8187
* @return The job Ids of anomaly detector job tasks
8288
*/
8389
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
84-
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> activeTasks = tasks.tasks();
85-
86-
return activeTasks.stream().filter(t -> t.getId().startsWith(JOB_TASK_PREFIX))
87-
.map(t -> t.getId().substring(JOB_TASK_PREFIX.length()))
90+
return tasks.findTasks(JOB_TASK_NAME, task -> true)
91+
.stream()
92+
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
8893
.collect(Collectors.toSet());
8994
}
95+
96+
/**
97+
* Is there an ml anomaly detector job task for the job {@code jobId}?
98+
* @param jobId The job id
99+
* @param tasks Persistent tasks
100+
* @return
101+
*/
102+
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
103+
return openJobIds(tasks).contains(jobId);
104+
}
105+
106+
/**
107+
* Read the active anomaly detector job tasks.
108+
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
109+
*
110+
* @param tasks Persistent tasks
111+
* @return The job tasks excluding closed and failed jobs
112+
*/
113+
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
114+
return tasks.findTasks(JOB_TASK_NAME, task -> true)
115+
.stream()
116+
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
117+
.collect(Collectors.toList());
118+
}
90119
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/OpenJobAction.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.support.master.MasterNodeRequest;
1414
import org.elasticsearch.client.ElasticsearchClient;
1515
import org.elasticsearch.cluster.metadata.MetaData;
16+
import org.elasticsearch.common.Nullable;
1617
import org.elasticsearch.common.ParseField;
1718
import org.elasticsearch.common.Strings;
1819
import org.elasticsearch.common.io.stream.StreamInput;
@@ -25,6 +26,7 @@
2526
import org.elasticsearch.tasks.Task;
2627
import org.elasticsearch.xpack.core.XPackPlugin;
2728
import org.elasticsearch.xpack.core.ml.MachineLearningField;
29+
import org.elasticsearch.xpack.core.ml.MlTasks;
2830
import org.elasticsearch.xpack.core.ml.job.config.Job;
2931
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3032

@@ -35,7 +37,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, AcknowledgedRes
3537

3638
public static final OpenJobAction INSTANCE = new OpenJobAction();
3739
public static final String NAME = "cluster:admin/xpack/ml/job/open";
38-
public static final String TASK_NAME = "xpack/ml/job";
40+
3941

4042
private OpenJobAction() {
4143
super(NAME);
@@ -136,10 +138,9 @@ public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {
136138

137139
/** TODO Remove in 7.0.0 */
138140
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
139-
140141
public static final ParseField TIMEOUT = new ParseField("timeout");
141-
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, JobParams::new);
142142

143+
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, true, JobParams::new);
143144
static {
144145
PARSER.declareString(JobParams::setJobId, Job.ID);
145146
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
@@ -163,6 +164,7 @@ public static JobParams parseRequest(String jobId, XContentParser parser) {
163164
// A big state can take a while to restore. For symmetry with the _close endpoint any
164165
// changes here should be reflected there too.
165166
private TimeValue timeout = MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT;
167+
private Job job;
166168

167169
JobParams() {
168170
}
@@ -178,6 +180,9 @@ public JobParams(StreamInput in) throws IOException {
178180
in.readBoolean();
179181
}
180182
timeout = TimeValue.timeValueMillis(in.readVLong());
183+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
184+
job = in.readOptionalWriteable(Job::new);
185+
}
181186
}
182187

183188
public String getJobId() {
@@ -196,9 +201,18 @@ public void setTimeout(TimeValue timeout) {
196201
this.timeout = timeout;
197202
}
198203

204+
@Nullable
205+
public Job getJob() {
206+
return job;
207+
}
208+
209+
public void setJob(Job job) {
210+
this.job = job;
211+
}
212+
199213
@Override
200214
public String getWriteableName() {
201-
return TASK_NAME;
215+
return MlTasks.JOB_TASK_NAME;
202216
}
203217

204218
@Override
@@ -209,6 +223,9 @@ public void writeTo(StreamOutput out) throws IOException {
209223
out.writeBoolean(true);
210224
}
211225
out.writeVLong(timeout.millis());
226+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
227+
out.writeOptionalWriteable(job);
228+
}
212229
}
213230

214231
@Override
@@ -217,12 +234,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
217234
builder.field(Job.ID.getPreferredName(), jobId);
218235
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
219236
builder.endObject();
237+
// The job field is streamed but not persisted
220238
return builder;
221239
}
222240

223241
@Override
224242
public int hashCode() {
225-
return Objects.hash(jobId, timeout);
243+
return Objects.hash(jobId, timeout, job);
226244
}
227245

228246
@Override
@@ -235,7 +253,8 @@ public boolean equals(Object obj) {
235253
}
236254
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
237255
return Objects.equals(jobId, other.jobId) &&
238-
Objects.equals(timeout, other.timeout);
256+
Objects.equals(timeout, other.timeout) &&
257+
Objects.equals(job, other.job);
239258
}
240259

241260
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.xcontent.XContentParser;
2727
import org.elasticsearch.index.mapper.DateFieldMapper;
2828
import org.elasticsearch.xpack.core.XPackPlugin;
29+
import org.elasticsearch.xpack.core.ml.MlTasks;
2930
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
3031
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3132
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@@ -43,7 +44,6 @@ public class StartDatafeedAction
4344

4445
public static final StartDatafeedAction INSTANCE = new StartDatafeedAction();
4546
public static final String NAME = "cluster:admin/xpack/ml/datafeed/start";
46-
public static final String TASK_NAME = "xpack/ml/datafeed";
4747

4848
private StartDatafeedAction() {
4949
super(NAME);
@@ -147,8 +147,7 @@ public boolean equals(Object obj) {
147147

148148
public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {
149149

150-
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, DatafeedParams::new);
151-
150+
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new);
152151
static {
153152
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
154153
PARSER.declareString((params, startTime) -> params.startTime = parseDateOrThrow(
@@ -235,7 +234,7 @@ public void setTimeout(TimeValue timeout) {
235234

236235
@Override
237236
public String getWriteableName() {
238-
return TASK_NAME;
237+
return MlTasks.DATAFEED_TASK_NAME;
239238
}
240239

241240
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.elasticsearch.common.xcontent.XContentBuilder;
1414
import org.elasticsearch.common.xcontent.XContentParser;
1515
import org.elasticsearch.persistent.PersistentTaskState;
16-
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
16+
import org.elasticsearch.xpack.core.ml.MlTasks;
1717

1818
import java.io.IOException;
1919
import java.util.Locale;
@@ -24,7 +24,7 @@ public enum DatafeedState implements PersistentTaskState {
2424

2525
STARTED, STOPPED, STARTING, STOPPING;
2626

27-
public static final String NAME = StartDatafeedAction.TASK_NAME;
27+
public static final String NAME = MlTasks.DATAFEED_TASK_NAME;
2828

2929
private static final ConstructingObjectParser<DatafeedState, Void> PARSER =
3030
new ConstructingObjectParser<>(NAME, args -> fromString((String) args[0]));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import org.elasticsearch.common.xcontent.XContentParser;
1515
import org.elasticsearch.persistent.PersistentTaskState;
1616
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
17-
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
17+
import org.elasticsearch.xpack.core.ml.MlTasks;
1818

1919
import java.io.IOException;
2020
import java.util.Objects;
@@ -23,7 +23,7 @@
2323

2424
public class JobTaskState implements PersistentTaskState {
2525

26-
public static final String NAME = OpenJobAction.TASK_NAME;
26+
public static final String NAME = MlTasks.JOB_TASK_NAME;
2727

2828
private static ParseField STATE = new ParseField("state");
2929
private static ParseField ALLOCATION_ID = new ParseField("allocation_id");

0 commit comments

Comments
 (0)