Skip to content

Commit 15a9fdf

Browse files
authored
[ML] Job in index: Enable get and update actions for clusterstate jobs (#35598)
Small fixes to read from all locations and added index mappings for DelayedDataCheckConfig
1 parent 49781b9 commit 15a9fdf

File tree

14 files changed

+190
-52
lines changed

14 files changed

+190
-52
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.common.xcontent.XContentBuilder;
1010
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
1111
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
12+
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
1213
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
1314
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
1415
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
@@ -358,6 +359,16 @@ public static void addDatafeedConfigFields(XContentBuilder builder) throws IOExc
358359
.endObject()
359360
.endObject()
360361
.endObject()
362+
.startObject(DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName())
363+
.startObject(PROPERTIES)
364+
.startObject(DelayedDataCheckConfig.ENABLED.getPreferredName())
365+
.field(TYPE, BOOLEAN)
366+
.endObject()
367+
.startObject(DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName())
368+
.field(TYPE, KEYWORD)
369+
.endObject()
370+
.endObject()
371+
.endObject()
361372
.startObject(DatafeedConfig.HEADERS.getPreferredName())
362373
.field(ENABLED, false)
363374
.endObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
99
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
10+
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
1011
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
1112
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
1213
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
@@ -251,6 +252,9 @@ public final class ReservedFieldNames {
251252
DatafeedConfig.SCRIPT_FIELDS.getPreferredName(),
252253
DatafeedConfig.CHUNKING_CONFIG.getPreferredName(),
253254
DatafeedConfig.HEADERS.getPreferredName(),
255+
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(),
256+
DelayedDataCheckConfig.ENABLED.getPreferredName(),
257+
DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName(),
254258

255259
ChunkingConfig.MODE_FIELD.getPreferredName(),
256260
ChunkingConfig.TIME_SPAN_FIELD.getPreferredName(),

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/groups/GroupOrJobLookupTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.List;
1616

1717
import static org.hamcrest.Matchers.contains;
18+
import static org.hamcrest.Matchers.empty;
1819
import static org.hamcrest.Matchers.equalTo;
1920
import static org.hamcrest.Matchers.is;
2021
import static org.mockito.Mockito.mock;
@@ -103,6 +104,8 @@ public void testExpandGroupIds() {
103104
assertThat(groupOrJobLookup.expandGroupIds("foo*"), contains("foo-group"));
104105
assertThat(groupOrJobLookup.expandGroupIds("bar-group,nogroup"), contains("bar-group"));
105106
assertThat(groupOrJobLookup.expandGroupIds("*"), contains("bar-group", "foo-group"));
107+
assertThat(groupOrJobLookup.expandGroupIds("foo-group"), contains("foo-group"));
108+
assertThat(groupOrJobLookup.expandGroupIds("no-group"), empty());
106109
}
107110

108111
private static Job mockJob(String jobId, List<String> groups) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetBucketsAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ public TransportGetBucketsAction(Settings settings, ThreadPool threadPool, Trans
3838

3939
@Override
4040
protected void doExecute(GetBucketsAction.Request request, ActionListener<GetBucketsAction.Response> listener) {
41-
jobManager.getJob(request.getJobId(), ActionListener.wrap(
42-
job -> {
41+
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
42+
jobFound -> {
4343
BucketsQueryBuilder query =
4444
new BucketsQueryBuilder().expand(request.isExpand())
4545
.includeInterim(request.isExcludeInterim() == false)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetCalendarEventsAction.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
1818
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
1919
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
20-
import org.elasticsearch.xpack.core.ml.job.config.Job;
2120
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
22-
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
21+
import org.elasticsearch.xpack.ml.job.JobManager;
2322
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
2423
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
2524

@@ -29,17 +28,17 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<Get
2928
GetCalendarEventsAction.Response> {
3029

3130
private final JobResultsProvider jobResultsProvider;
32-
private final JobConfigProvider jobConfigProvider;
31+
private final JobManager jobManager;
3332

3433
@Inject
3534
public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool,
3635
TransportService transportService, ActionFilters actionFilters,
3736
IndexNameExpressionResolver indexNameExpressionResolver,
38-
JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) {
37+
JobResultsProvider jobResultsProvider, JobManager jobManager) {
3938
super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters,
4039
indexNameExpressionResolver, GetCalendarEventsAction.Request::new);
4140
this.jobResultsProvider = jobResultsProvider;
42-
this.jobConfigProvider = jobConfigProvider;
41+
this.jobManager = jobManager;
4342
}
4443

4544
@Override
@@ -66,15 +65,13 @@ protected void doExecute(GetCalendarEventsAction.Request request,
6665

6766
if (request.getJobId() != null) {
6867

69-
jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap(
70-
jobBuiler -> {
71-
Job job = jobBuiler.build();
68+
jobManager.getJob(request.getJobId(), ActionListener.wrap(
69+
job -> {
7270
jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener);
73-
7471
},
7572
jobNotFound -> {
7673
// is the request Id a group?
77-
jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap(
74+
jobManager.groupExists(request.getJobId(), ActionListener.wrap(
7875
groupExists -> {
7976
if (groupExists) {
8077
jobResultsProvider.scheduledEventsForJob(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetInfluencersAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public TransportGetInfluencersAction(Settings settings, ThreadPool threadPool, T
3939
@Override
4040
protected void doExecute(GetInfluencersAction.Request request, ActionListener<GetInfluencersAction.Response> listener) {
4141

42-
jobManager.getJob(request.getJobId(), ActionListener.wrap(
43-
job -> {
42+
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
43+
jobFound -> {
4444
InfluencersQueryBuilder.InfluencersQuery query = new InfluencersQueryBuilder()
4545
.includeInterim(request.isExcludeInterim() == false)
4646
.start(request.getStart())

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetRecordsAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ public TransportGetRecordsAction(Settings settings, ThreadPool threadPool, Trans
3939
@Override
4040
protected void doExecute(GetRecordsAction.Request request, ActionListener<GetRecordsAction.Response> listener) {
4141

42-
jobManager.getJob(request.getJobId(), ActionListener.wrap(
43-
job -> {
42+
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
43+
jobFound -> {
4444
RecordsQueryBuilder query = new RecordsQueryBuilder()
4545
.includeInterim(request.isExcludeInterim() == false)
4646
.epochStart(request.getStart())

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2121
import org.elasticsearch.client.Client;
2222
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2324
import org.elasticsearch.cluster.block.ClusterBlockException;
2425
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2526
import org.elasticsearch.cluster.metadata.AliasOrIndex;
2627
import org.elasticsearch.cluster.metadata.IndexMetaData;
2728
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2829
import org.elasticsearch.cluster.metadata.MappingMetaData;
30+
import org.elasticsearch.cluster.metadata.MetaData;
2931
import org.elasticsearch.cluster.node.DiscoveryNode;
3032
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3133
import org.elasticsearch.cluster.service.ClusterService;
@@ -52,6 +54,7 @@
5254
import org.elasticsearch.transport.TransportService;
5355
import org.elasticsearch.xpack.core.XPackField;
5456
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
57+
import org.elasticsearch.xpack.core.ml.MlMetadata;
5558
import org.elasticsearch.xpack.core.ml.MlTasks;
5659
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
5760
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
@@ -66,6 +69,8 @@
6669
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
6770
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
6871
import org.elasticsearch.xpack.ml.MachineLearning;
72+
import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate;
73+
import org.elasticsearch.xpack.ml.job.JobManager;
6974
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
7075
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
7176
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@@ -102,24 +107,26 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
102107
private final XPackLicenseState licenseState;
103108
private final PersistentTasksService persistentTasksService;
104109
private final Client client;
105-
private final JobResultsProvider jobResultsProvider;
106110
private final JobConfigProvider jobConfigProvider;
111+
private final JobResultsProvider jobResultsProvider;
112+
private final JobManager jobManager;
107113
private final MlMemoryTracker memoryTracker;
108114

109115
@Inject
110116
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
111117
XPackLicenseState licenseState, ClusterService clusterService,
112118
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
113119
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
114-
JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider,
115-
MlMemoryTracker memoryTracker) {
120+
JobResultsProvider jobResultsProvider, JobManager jobManager,
121+
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
116122
super(settings, OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
117123
indexNameExpressionResolver, OpenJobAction.Request::new);
118124
this.licenseState = licenseState;
119125
this.persistentTasksService = persistentTasksService;
120126
this.client = client;
121127
this.jobResultsProvider = jobResultsProvider;
122128
this.jobConfigProvider = jobConfigProvider;
129+
this.jobManager = jobManager;
123130
this.memoryTracker = memoryTracker;
124131
}
125132

@@ -618,10 +625,10 @@ public void onFailure(Exception e) {
618625
);
619626

620627
// Get the job config
621-
jobConfigProvider.getJob(jobParams.getJobId(), ActionListener.wrap(
622-
builder -> {
628+
jobManager.getJob(jobParams.getJobId(), ActionListener.wrap(
629+
job -> {
623630
try {
624-
jobParams.setJob(builder.build());
631+
jobParams.setJob(job);
625632

626633
// Try adding results doc mapping
627634
addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()),
@@ -670,16 +677,48 @@ public void onTimeout(TimeValue timeout) {
670677
}
671678

672679
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
673-
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
674680

675-
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
676-
job -> listener.onResponse(new AcknowledgedResponse(true)),
677-
e -> {
678-
logger.error("[" + jobId + "] Failed to clear finished_time", e);
679-
// Not a critical error so continue
681+
boolean jobIsInClusterState = ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId);
682+
if (jobIsInClusterState) {
683+
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
684+
@Override
685+
public ClusterState execute(ClusterState currentState) {
686+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
687+
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
688+
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
689+
jobBuilder.setFinishedTime(null);
690+
691+
mlMetadataBuilder.putJob(jobBuilder.build(), true);
692+
ClusterState.Builder builder = ClusterState.builder(currentState);
693+
return builder.metaData(new MetaData.Builder(currentState.metaData())
694+
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
695+
.build();
696+
}
697+
698+
@Override
699+
public void onFailure(String source, Exception e) {
700+
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
680701
listener.onResponse(new AcknowledgedResponse(true));
681702
}
682-
));
703+
704+
@Override
705+
public void clusterStateProcessed(String source, ClusterState oldState,
706+
ClusterState newState) {
707+
listener.onResponse(new AcknowledgedResponse(true));
708+
}
709+
});
710+
} else {
711+
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
712+
713+
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
714+
job -> listener.onResponse(new AcknowledgedResponse(true)),
715+
e -> {
716+
logger.error("[" + jobId + "] Failed to clear finished_time", e);
717+
// Not a critical error so continue
718+
listener.onResponse(new AcknowledgedResponse(true));
719+
}
720+
));
721+
}
683722
}
684723

685724
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,21 @@
1010
import org.elasticsearch.action.support.HandledTransportAction;
1111
import org.elasticsearch.client.Client;
1212
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
13+
import org.elasticsearch.cluster.service.ClusterService;
1314
import org.elasticsearch.common.bytes.BytesArray;
1415
import org.elasticsearch.common.inject.Inject;
1516
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
1618
import org.elasticsearch.threadpool.ThreadPool;
1719
import org.elasticsearch.transport.TransportService;
1820
import org.elasticsearch.xpack.core.ClientHelper;
1921
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
2022
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
2123
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2224
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
25+
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader;
2326
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
24-
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
25-
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
27+
import org.elasticsearch.xpack.ml.job.JobManager;
2628

2729
import java.io.BufferedReader;
2830
import java.io.InputStream;
@@ -35,29 +37,30 @@
3537
public class TransportPreviewDatafeedAction extends HandledTransportAction<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response> {
3638

3739
private final Client client;
38-
private final JobConfigProvider jobConfigProvider;
39-
private final DatafeedConfigProvider datafeedConfigProvider;
40+
private final ClusterService clusterService;
41+
private final JobManager jobManager;
42+
private final DatafeedConfigReader datafeedConfigReader;
4043

4144
@Inject
4245
public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService,
4346
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
44-
Client client, JobConfigProvider jobConfigProvider,
45-
DatafeedConfigProvider datafeedConfigProvider) {
47+
Client client, JobManager jobManager, NamedXContentRegistry xContentRegistry,
48+
ClusterService clusterService) {
4649
super(settings, PreviewDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
4750
PreviewDatafeedAction.Request::new);
4851
this.client = client;
49-
this.jobConfigProvider = jobConfigProvider;
50-
this.datafeedConfigProvider = datafeedConfigProvider;
52+
this.clusterService = clusterService;
53+
this.jobManager = jobManager;
54+
this.datafeedConfigReader = new DatafeedConfigReader(client, xContentRegistry);
5155
}
5256

5357
@Override
5458
protected void doExecute(PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
5559

56-
datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
57-
datafeedConfigBuilder -> {
58-
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
59-
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
60-
jobBuilder -> {
60+
datafeedConfigReader.datafeedConfig(request.getDatafeedId(), clusterService.state(), ActionListener.wrap(
61+
datafeedConfig -> {
62+
jobManager.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
63+
job -> {
6164
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
6265
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
6366
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
@@ -66,7 +69,7 @@ protected void doExecute(PreviewDatafeedAction.Request request, ActionListener<P
6669
// NB: this is using the client from the transport layer, NOT the internal client.
6770
// This is important because it means the datafeed search will fail if the user
6871
// requesting the preview doesn't have permission to search the relevant indices.
69-
DataExtractorFactory.create(client, previewDatafeed.build(), jobBuilder.build(),
72+
DataExtractorFactory.create(client, previewDatafeed.build(), job,
7073
new ActionListener<DataExtractorFactory>() {
7174
@Override
7275
public void onResponse(DataExtractorFactory dataExtractorFactory) {

0 commit comments

Comments
 (0)