Skip to content

Commit 7c8b98e

Browse files
authored
[ML] Job in Index: Stop and preview datafeed (#34605)
1 parent 9874b2f commit 7c8b98e

File tree

3 files changed

+85
-146
lines changed

3 files changed

+85
-146
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,19 @@
1010
import org.elasticsearch.action.support.HandledTransportAction;
1111
import org.elasticsearch.client.Client;
1212
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
13-
import org.elasticsearch.cluster.service.ClusterService;
1413
import org.elasticsearch.common.bytes.BytesArray;
1514
import org.elasticsearch.common.inject.Inject;
1615
import org.elasticsearch.common.settings.Settings;
1716
import org.elasticsearch.threadpool.ThreadPool;
1817
import org.elasticsearch.transport.TransportService;
1918
import org.elasticsearch.xpack.core.ClientHelper;
20-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2119
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
2220
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
2321
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2422
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
25-
import org.elasticsearch.xpack.core.ml.job.config.Job;
26-
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2723
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
24+
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
25+
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
2826

2927
import java.io.BufferedReader;
3028
import java.io.InputStream;
@@ -37,51 +35,56 @@
3735
public class TransportPreviewDatafeedAction extends HandledTransportAction<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response> {
3836

3937
private final Client client;
40-
private final ClusterService clusterService;
38+
private final JobConfigProvider jobConfigProvider;
39+
private final DatafeedConfigProvider datafeedConfigProvider;
4140

4241
@Inject
4342
public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService,
4443
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
45-
Client client, ClusterService clusterService) {
44+
Client client, JobConfigProvider jobConfigProvider,
45+
DatafeedConfigProvider datafeedConfigProvider) {
4646
super(settings, PreviewDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
4747
PreviewDatafeedAction.Request::new);
4848
this.client = client;
49-
this.clusterService = clusterService;
49+
this.jobConfigProvider = jobConfigProvider;
50+
this.datafeedConfigProvider = datafeedConfigProvider;
5051
}
5152

5253
@Override
5354
protected void doExecute(PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
54-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
55-
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
56-
if (datafeed == null) {
57-
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
58-
}
59-
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
60-
if (job == null) {
61-
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
62-
}
6355

64-
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeed);
65-
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
66-
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
67-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
68-
previewDatafeed.setHeaders(headers);
69-
// NB: this is using the client from the transport layer, NOT the internal client.
70-
// This is important because it means the datafeed search will fail if the user
71-
// requesting the preview doesn't have permission to search the relevant indices.
72-
DataExtractorFactory.create(client, previewDatafeed.build(), job, new ActionListener<DataExtractorFactory>() {
73-
@Override
74-
public void onResponse(DataExtractorFactory dataExtractorFactory) {
75-
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
76-
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
77-
}
78-
79-
@Override
80-
public void onFailure(Exception e) {
81-
listener.onFailure(e);
82-
}
83-
});
56+
datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
57+
datafeedConfigBuilder -> {
58+
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
59+
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
60+
jobBuilder -> {
61+
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
62+
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
63+
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
64+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
65+
previewDatafeed.setHeaders(headers);
66+
// NB: this is using the client from the transport layer, NOT the internal client.
67+
// This is important because it means the datafeed search will fail if the user
68+
// requesting the preview doesn't have permission to search the relevant indices.
69+
DataExtractorFactory.create(client, previewDatafeed.build(), jobBuilder.build(),
70+
new ActionListener<DataExtractorFactory>() {
71+
@Override
72+
public void onResponse(DataExtractorFactory dataExtractorFactory) {
73+
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
74+
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
75+
}
8476

77+
@Override
78+
public void onFailure(Exception e) {
79+
listener.onFailure(e);
80+
}
81+
});
82+
},
83+
listener::onFailure
84+
));
85+
},
86+
listener::onFailure
87+
));
8588
}
8689

8790
/** Visible for testing */

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,17 @@
2323
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2424
import org.elasticsearch.common.util.concurrent.AtomicArray;
2525
import org.elasticsearch.discovery.MasterNotDiscoveredException;
26+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
27+
import org.elasticsearch.persistent.PersistentTasksService;
2628
import org.elasticsearch.tasks.Task;
2729
import org.elasticsearch.threadpool.ThreadPool;
2830
import org.elasticsearch.transport.TransportService;
29-
import org.elasticsearch.xpack.core.ml.MlMetadata;
3031
import org.elasticsearch.xpack.core.ml.MlTasks;
3132
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
32-
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
3333
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
34-
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3534
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
36-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
37-
import org.elasticsearch.persistent.PersistentTasksService;
3835
import org.elasticsearch.xpack.ml.MachineLearning;
36+
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3937

4038
import java.io.IOException;
4139
import java.util.ArrayList;
@@ -50,35 +48,36 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
5048
StopDatafeedAction.Response, StopDatafeedAction.Response> {
5149

5250
private final PersistentTasksService persistentTasksService;
51+
private final DatafeedConfigProvider datafeedConfigProvider;
5352

5453
@Inject
5554
public TransportStopDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
5655
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
57-
ClusterService clusterService, PersistentTasksService persistentTasksService) {
56+
ClusterService clusterService, PersistentTasksService persistentTasksService,
57+
DatafeedConfigProvider datafeedConfigProvider) {
5858
super(settings, StopDatafeedAction.NAME, threadPool, clusterService, transportService, actionFilters,
5959
indexNameExpressionResolver, StopDatafeedAction.Request::new, StopDatafeedAction.Response::new,
6060
MachineLearning.UTILITY_THREAD_POOL_NAME);
6161
this.persistentTasksService = persistentTasksService;
62+
this.datafeedConfigProvider = datafeedConfigProvider;
63+
6264
}
6365

6466
/**
65-
* Resolve the requested datafeeds and add their IDs to one of the list
66-
* arguments depending on datafeed state.
67+
* Sort the datafeed IDs the their task state and add to one
68+
* of the list arguments depending on the state.
6769
*
68-
* @param request The stop datafeed request
69-
* @param mlMetadata ML Metadata
70+
* @param expandedDatafeedIds The expanded set of IDs
7071
* @param tasks Persistent task meta data
7172
* @param startedDatafeedIds Started datafeed ids are added to this list
7273
* @param stoppingDatafeedIds Stopping datafeed ids are added to this list
7374
*/
74-
static void resolveDataFeedIds(StopDatafeedAction.Request request, MlMetadata mlMetadata,
75-
PersistentTasksCustomMetaData tasks,
76-
List<String> startedDatafeedIds,
77-
List<String> stoppingDatafeedIds) {
75+
static void sortDatafeedIdsByTaskState(Set<String> expandedDatafeedIds,
76+
PersistentTasksCustomMetaData tasks,
77+
List<String> startedDatafeedIds,
78+
List<String> stoppingDatafeedIds) {
7879

79-
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
8080
for (String expandedDatafeedId : expandedDatafeedIds) {
81-
validateDatafeedTask(expandedDatafeedId, mlMetadata);
8281
addDatafeedTaskIdAccordingToState(expandedDatafeedId, MlTasks.getDatafeedState(expandedDatafeedId, tasks),
8382
startedDatafeedIds, stoppingDatafeedIds);
8483
}
@@ -102,20 +101,6 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId,
102101
}
103102
}
104103

105-
/**
106-
* Validate the stop request.
107-
* Throws an {@code ResourceNotFoundException} if there is no datafeed
108-
* with id {@code datafeedId}
109-
* @param datafeedId The datafeed Id
110-
* @param mlMetadata ML meta data
111-
*/
112-
static void validateDatafeedTask(String datafeedId, MlMetadata mlMetadata) {
113-
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
114-
if (datafeed == null) {
115-
throw new ResourceNotFoundException(Messages.getMessage(Messages.DATAFEED_NOT_FOUND, datafeedId));
116-
}
117-
}
118-
119104
@Override
120105
protected void doExecute(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener) {
121106
final ClusterState state = clusterService.state();
@@ -130,23 +115,27 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
130115
new ActionListenerResponseHandler<>(listener, StopDatafeedAction.Response::new));
131116
}
132117
} else {
133-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
134-
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
118+
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
119+
expandedIds -> {
120+
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
135121

136-
List<String> startedDatafeeds = new ArrayList<>();
137-
List<String> stoppingDatafeeds = new ArrayList<>();
138-
resolveDataFeedIds(request, mlMetadata, tasks, startedDatafeeds, stoppingDatafeeds);
139-
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
140-
listener.onResponse(new StopDatafeedAction.Response(true));
141-
return;
142-
}
143-
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
122+
List<String> startedDatafeeds = new ArrayList<>();
123+
List<String> stoppingDatafeeds = new ArrayList<>();
124+
sortDatafeedIdsByTaskState(expandedIds, tasks, startedDatafeeds, stoppingDatafeeds);
125+
if (startedDatafeeds.isEmpty() && stoppingDatafeeds.isEmpty()) {
126+
listener.onResponse(new StopDatafeedAction.Response(true));
127+
return;
128+
}
129+
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));
144130

145-
if (request.isForce()) {
146-
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
147-
} else {
148-
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
149-
}
131+
if (request.isForce()) {
132+
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
133+
} else {
134+
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
135+
}
136+
},
137+
listener::onFailure
138+
));
150139
}
151140
}
152141

0 commit comments

Comments
 (0)