Skip to content

Commit 9874b2f

Browse files
committed
[ML] Job in index: Datafeed node selector (#34218)
1 parent 47787b3 commit 9874b2f

File tree

14 files changed

+437
-311
lines changed

14 files changed

+437
-311
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
import org.elasticsearch.xpack.core.XPackPlugin;
2929
import org.elasticsearch.xpack.core.ml.MlTasks;
3030
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
31+
import org.elasticsearch.xpack.core.ml.job.config.Job;
3132
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3233
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3334

3435
import java.io.IOException;
36+
import java.util.Collections;
37+
import java.util.List;
3538
import java.util.Objects;
3639
import java.util.function.LongSupplier;
3740

@@ -147,6 +150,8 @@ public boolean equals(Object obj) {
147150

148151
public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {
149152

153+
public static final ParseField INDICES = new ParseField("indices");
154+
150155
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new);
151156
static {
152157
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
@@ -155,6 +160,8 @@ public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskPar
155160
PARSER.declareString(DatafeedParams::setEndTime, END_TIME);
156161
PARSER.declareString((params, val) ->
157162
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
163+
PARSER.declareString(DatafeedParams::setJobId, Job.ID);
164+
PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES);
158165
}
159166

160167
static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
@@ -194,6 +201,10 @@ public DatafeedParams(StreamInput in) throws IOException {
194201
startTime = in.readVLong();
195202
endTime = in.readOptionalLong();
196203
timeout = TimeValue.timeValueMillis(in.readVLong());
204+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
205+
jobId = in.readOptionalString();
206+
datafeedIndices = in.readList(StreamInput::readString);
207+
}
197208
}
198209

199210
DatafeedParams() {
@@ -203,6 +214,9 @@ public DatafeedParams(StreamInput in) throws IOException {
203214
private long startTime;
204215
private Long endTime;
205216
private TimeValue timeout = TimeValue.timeValueSeconds(20);
217+
private List<String> datafeedIndices = Collections.emptyList();
218+
private String jobId;
219+
206220

207221
public String getDatafeedId() {
208222
return datafeedId;
@@ -232,6 +246,22 @@ public void setTimeout(TimeValue timeout) {
232246
this.timeout = timeout;
233247
}
234248

249+
public String getJobId() {
250+
return jobId;
251+
}
252+
253+
public void setJobId(String jobId) {
254+
this.jobId = jobId;
255+
}
256+
257+
public List<String> getDatafeedIndices() {
258+
return datafeedIndices;
259+
}
260+
261+
public void setDatafeedIndices(List<String> datafeedIndices) {
262+
this.datafeedIndices = datafeedIndices;
263+
}
264+
235265
@Override
236266
public String getWriteableName() {
237267
return MlTasks.DATAFEED_TASK_NAME;
@@ -248,6 +278,10 @@ public void writeTo(StreamOutput out) throws IOException {
248278
out.writeVLong(startTime);
249279
out.writeOptionalLong(endTime);
250280
out.writeVLong(timeout.millis());
281+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
282+
out.writeOptionalString(jobId);
283+
out.writeStringList(datafeedIndices);
284+
}
251285
}
252286

253287
@Override
@@ -259,13 +293,19 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
259293
builder.field(END_TIME.getPreferredName(), String.valueOf(endTime));
260294
}
261295
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
296+
if (jobId != null) {
297+
builder.field(Job.ID.getPreferredName(), jobId);
298+
}
299+
if (datafeedIndices.isEmpty() == false) {
300+
builder.field(INDICES.getPreferredName(), datafeedIndices);
301+
}
262302
builder.endObject();
263303
return builder;
264304
}
265305

266306
@Override
267307
public int hashCode() {
268-
return Objects.hash(datafeedId, startTime, endTime, timeout);
308+
return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices);
269309
}
270310

271311
@Override
@@ -280,7 +320,9 @@ public boolean equals(Object obj) {
280320
return Objects.equals(datafeedId, other.datafeedId) &&
281321
Objects.equals(startTime, other.startTime) &&
282322
Objects.equals(endTime, other.endTime) &&
283-
Objects.equals(timeout, other.timeout);
323+
Objects.equals(timeout, other.timeout) &&
324+
Objects.equals(jobId, other.jobId) &&
325+
Objects.equals(datafeedIndices, other.datafeedIndices);
284326
}
285327
}
286328

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.test.AbstractSerializingTestCase;
1313

1414
import java.io.IOException;
15+
import java.util.Arrays;
1516

1617
public class DatafeedParamsTests extends AbstractSerializingTestCase<StartDatafeedAction.DatafeedParams> {
1718
@Override
@@ -28,6 +29,13 @@ public static StartDatafeedAction.DatafeedParams createDatafeedParams() {
2829
if (randomBoolean()) {
2930
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
3031
}
32+
if (randomBoolean()) {
33+
params.setJobId(randomAlphaOfLength(10));
34+
}
35+
if (randomBoolean()) {
36+
params.setDatafeedIndices(Arrays.asList(randomAlphaOfLength(10), randomAlphaOfLength(10)));
37+
}
38+
3139
return params;
3240
}
3341

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
411411
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
412412
normalizerFactory, xContentRegistry, auditor);
413413
this.autodetectProcessManager.set(autodetectProcessManager);
414-
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
414+
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
415+
auditor, System::currentTimeMillis);
415416
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
416417
System::currentTimeMillis, auditor);
417418
this.datafeedManager.set(datafeedManager);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,13 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.component.AbstractComponent;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
16+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
17+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
1518
import org.elasticsearch.threadpool.ThreadPool;
16-
import org.elasticsearch.xpack.core.ml.MlMetadata;
1719
import org.elasticsearch.xpack.core.ml.MlTasks;
1820
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
1921
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
20-
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
21-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
22-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
23-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
2422
import org.elasticsearch.xpack.ml.notifications.Auditor;
2523

2624
import java.util.Objects;
@@ -89,16 +87,20 @@ public void clusterChanged(ClusterChangedEvent event) {
8987
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
9088
}
9189
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
92-
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
93-
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
90+
StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams();
91+
String jobId = datafeedParams.getJobId();
9492
if (currentAssignment.getExecutorNode() == null) {
95-
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
93+
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
9694
currentAssignment.getExplanation() + "]";
97-
logger.warn("[{}] {}", datafeedConfig.getJobId(), msg);
98-
auditor.warning(datafeedConfig.getJobId(), msg);
95+
logger.warn("[{}] {}", jobId, msg);
96+
if (jobId != null) {
97+
auditor.warning(jobId, msg);
98+
}
9999
} else {
100100
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
101-
auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]");
101+
if (jobId != null) {
102+
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
103+
}
102104
}
103105
}
104106
}

0 commit comments

Comments
 (0)