Skip to content

Commit f8614a1

Browse files
committed
[ML] Job in index: Datafeed node selector (elastic#34218)
1 parent e6e78fe commit f8614a1

File tree

14 files changed

+437
-311
lines changed

14 files changed

+437
-311
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDatafeedAction.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
import org.elasticsearch.xpack.core.XPackPlugin;
2929
import org.elasticsearch.xpack.core.ml.MlTasks;
3030
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
31+
import org.elasticsearch.xpack.core.ml.job.config.Job;
3132
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3233
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3334

3435
import java.io.IOException;
36+
import java.util.Collections;
37+
import java.util.List;
3538
import java.util.Objects;
3639
import java.util.function.LongSupplier;
3740

@@ -141,6 +144,8 @@ public boolean equals(Object obj) {
141144

142145
public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {
143146

147+
public static final ParseField INDICES = new ParseField("indices");
148+
144149
public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new);
145150
static {
146151
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
@@ -149,6 +154,8 @@ public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskPar
149154
PARSER.declareString(DatafeedParams::setEndTime, END_TIME);
150155
PARSER.declareString((params, val) ->
151156
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
157+
PARSER.declareString(DatafeedParams::setJobId, Job.ID);
158+
PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES);
152159
}
153160

154161
static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
@@ -188,6 +195,10 @@ public DatafeedParams(StreamInput in) throws IOException {
188195
startTime = in.readVLong();
189196
endTime = in.readOptionalLong();
190197
timeout = TimeValue.timeValueMillis(in.readVLong());
198+
if (in.getVersion().onOrAfter(Version.CURRENT)) {
199+
jobId = in.readOptionalString();
200+
datafeedIndices = in.readList(StreamInput::readString);
201+
}
191202
}
192203

193204
DatafeedParams() {
@@ -197,6 +208,9 @@ public DatafeedParams(StreamInput in) throws IOException {
197208
private long startTime;
198209
private Long endTime;
199210
private TimeValue timeout = TimeValue.timeValueSeconds(20);
211+
private List<String> datafeedIndices = Collections.emptyList();
212+
private String jobId;
213+
200214

201215
public String getDatafeedId() {
202216
return datafeedId;
@@ -226,6 +240,22 @@ public void setTimeout(TimeValue timeout) {
226240
this.timeout = timeout;
227241
}
228242

243+
public String getJobId() {
244+
return jobId;
245+
}
246+
247+
public void setJobId(String jobId) {
248+
this.jobId = jobId;
249+
}
250+
251+
public List<String> getDatafeedIndices() {
252+
return datafeedIndices;
253+
}
254+
255+
public void setDatafeedIndices(List<String> datafeedIndices) {
256+
this.datafeedIndices = datafeedIndices;
257+
}
258+
229259
@Override
230260
public String getWriteableName() {
231261
return MlTasks.DATAFEED_TASK_NAME;
@@ -242,6 +272,10 @@ public void writeTo(StreamOutput out) throws IOException {
242272
out.writeVLong(startTime);
243273
out.writeOptionalLong(endTime);
244274
out.writeVLong(timeout.millis());
275+
if (out.getVersion().onOrAfter(Version.CURRENT)) {
276+
out.writeOptionalString(jobId);
277+
out.writeStringList(datafeedIndices);
278+
}
245279
}
246280

247281
@Override
@@ -253,13 +287,19 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
253287
builder.field(END_TIME.getPreferredName(), String.valueOf(endTime));
254288
}
255289
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
290+
if (jobId != null) {
291+
builder.field(Job.ID.getPreferredName(), jobId);
292+
}
293+
if (datafeedIndices.isEmpty() == false) {
294+
builder.field(INDICES.getPreferredName(), datafeedIndices);
295+
}
256296
builder.endObject();
257297
return builder;
258298
}
259299

260300
@Override
261301
public int hashCode() {
262-
return Objects.hash(datafeedId, startTime, endTime, timeout);
302+
return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices);
263303
}
264304

265305
@Override
@@ -274,7 +314,9 @@ public boolean equals(Object obj) {
274314
return Objects.equals(datafeedId, other.datafeedId) &&
275315
Objects.equals(startTime, other.startTime) &&
276316
Objects.equals(endTime, other.endTime) &&
277-
Objects.equals(timeout, other.timeout);
317+
Objects.equals(timeout, other.timeout) &&
318+
Objects.equals(jobId, other.jobId) &&
319+
Objects.equals(datafeedIndices, other.datafeedIndices);
278320
}
279321
}
280322

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DatafeedParamsTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.test.AbstractSerializingTestCase;
1313

1414
import java.io.IOException;
15+
import java.util.Arrays;
1516

1617
public class DatafeedParamsTests extends AbstractSerializingTestCase<StartDatafeedAction.DatafeedParams> {
1718
@Override
@@ -28,6 +29,13 @@ public static StartDatafeedAction.DatafeedParams createDatafeedParams() {
2829
if (randomBoolean()) {
2930
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
3031
}
32+
if (randomBoolean()) {
33+
params.setJobId(randomAlphaOfLength(10));
34+
}
35+
if (randomBoolean()) {
36+
params.setDatafeedIndices(Arrays.asList(randomAlphaOfLength(10), randomAlphaOfLength(10)));
37+
}
38+
3139
return params;
3240
}
3341

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
408408
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
409409
normalizerFactory, xContentRegistry, auditor);
410410
this.autodetectProcessManager.set(autodetectProcessManager);
411-
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
411+
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
412+
auditor, System::currentTimeMillis);
412413
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
413414
System::currentTimeMillis, auditor);
414415
this.datafeedManager.set(datafeedManager);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,13 @@
1212
import org.elasticsearch.cluster.service.ClusterService;
1313
import org.elasticsearch.common.component.AbstractComponent;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
16+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
17+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
1518
import org.elasticsearch.threadpool.ThreadPool;
16-
import org.elasticsearch.xpack.core.ml.MlMetadata;
1719
import org.elasticsearch.xpack.core.ml.MlTasks;
1820
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
1921
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
20-
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
21-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
22-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
23-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
2422
import org.elasticsearch.xpack.ml.notifications.Auditor;
2523

2624
import java.util.Objects;
@@ -89,16 +87,20 @@ public void clusterChanged(ClusterChangedEvent event) {
8987
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
9088
}
9189
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
92-
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
93-
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
90+
StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams();
91+
String jobId = datafeedParams.getJobId();
9492
if (currentAssignment.getExecutorNode() == null) {
95-
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
93+
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
9694
currentAssignment.getExplanation() + "]";
97-
logger.warn("[{}] {}", datafeedConfig.getJobId(), msg);
98-
auditor.warning(datafeedConfig.getJobId(), msg);
95+
logger.warn("[{}] {}", jobId, msg);
96+
if (jobId != null) {
97+
auditor.warning(jobId, msg);
98+
}
9999
} else {
100100
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
101-
auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]");
101+
if (jobId != null) {
102+
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
103+
}
102104
}
103105
}
104106
}

0 commit comments

Comments
 (0)