Skip to content

Commit 527334d

Browse files
authored
[ML] fixing bug where analytics process starts with 0 rows (#45879)
The native process requires that there be a non-zero number of rows to analyze. If the flag --rows 0 is passed to the executable, it throws and does not start. When building the configuration for the process we should not start the native process if there are no rows. Adding some logging to indicate what is occurring.
1 parent 715f7e9 commit 527334d

File tree

5 files changed

+96
-6
lines changed

5 files changed

+96
-6
lines changed

x-pack/plugin/ml/qa/ml-with-security/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ integTest.runner {
146146
'ml/start_data_frame_analytics/Test start given source index has no compatible fields',
147147
'ml/start_data_frame_analytics/Test start with inconsistent body/param ids',
148148
'ml/start_data_frame_analytics/Test start given dest index is not empty',
149+
'ml/start_data_frame_analytics/Test start with compatible fields but no data',
149150
'ml/start_stop_datafeed/Test start datafeed job, but not open',
150151
'ml/start_stop_datafeed/Test start non existing datafeed',
151152
'ml/start_stop_datafeed/Test stop non existing datafeed',

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3535
import org.elasticsearch.cluster.service.ClusterService;
3636
import org.elasticsearch.common.Nullable;
37+
import org.elasticsearch.common.Strings;
3738
import org.elasticsearch.common.inject.Inject;
3839
import org.elasticsearch.common.io.stream.StreamInput;
3940
import org.elasticsearch.common.settings.Settings;
@@ -226,10 +227,41 @@ public void onFailure(Exception e) {
226227
}
227228

228229
private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {
230+
231+
// Step 5. Validate that there are analyzable data in the source index
232+
ActionListener<DataFrameAnalyticsConfig> validateMappingsMergeListener = ActionListener.wrap(
233+
config -> DataFrameDataExtractorFactory.createForSourceIndices(client,
234+
"validate_source_index_has_rows-" + id,
235+
config,
236+
ActionListener.wrap(
237+
dataFrameDataExtractorFactory ->
238+
dataFrameDataExtractorFactory
239+
.newExtractor(false)
240+
.collectDataSummaryAsync(ActionListener.wrap(
241+
dataSummary -> {
242+
if (dataSummary.rows == 0) {
243+
finalListener.onFailure(new ElasticsearchStatusException(
244+
"Unable to start {} as there are no analyzable data in source indices [{}].",
245+
RestStatus.BAD_REQUEST,
246+
id,
247+
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())
248+
));
249+
} else {
250+
finalListener.onResponse(config);
251+
}
252+
},
253+
finalListener::onFailure
254+
)),
255+
finalListener::onFailure
256+
))
257+
,
258+
finalListener::onFailure
259+
);
260+
229261
// Step 4. Validate mappings can be merged
230262
ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
231263
config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
232-
mappings -> finalListener.onResponse(config), finalListener::onFailure)),
264+
mappings -> validateMappingsMergeListener.onResponse(config), finalListener::onFailure)),
233265
finalListener::onFailure
234266
);
235267

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
11+
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.search.ClearScrollAction;
1213
import org.elasticsearch.action.search.ClearScrollRequest;
1314
import org.elasticsearch.action.search.SearchAction;
@@ -234,14 +235,33 @@ public List<String> getFieldNames() {
234235
}
235236

236237
public DataSummary collectDataSummary() {
237-
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
238+
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
239+
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
240+
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
241+
}
242+
243+
public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {
244+
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
245+
final int numberOfFields = context.extractedFields.getAllFields().size();
246+
247+
ClientHelper.executeWithHeadersAsync(context.headers,
248+
ClientHelper.ML_ORIGIN,
249+
client,
250+
SearchAction.INSTANCE,
251+
searchRequestBuilder.request(),
252+
ActionListener.wrap(
253+
searchResponse -> dataSummaryActionListener.onResponse(
254+
new DataSummary(searchResponse.getHits().getTotalHits().value, numberOfFields)),
255+
dataSummaryActionListener::onFailure
256+
));
257+
}
258+
259+
private SearchRequestBuilder buildDataSummarySearchRequestBuilder() {
260+
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
238261
.setIndices(context.indices)
239262
.setSize(0)
240263
.setQuery(context.query)
241264
.setTrackTotalHits(true);
242-
243-
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
244-
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
245265
}
246266

247267
public Set<String> getCategoricalFields() {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
1212
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1313
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.threadpool.ThreadPool;
1516
import org.elasticsearch.xpack.core.ClientHelper;
1617
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@@ -273,7 +274,15 @@ private synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtr
273274
}
274275

275276
dataExtractor = dataExtractorFactory.newExtractor(false);
276-
process = createProcess(task, createProcessConfig(config, dataExtractor));
277+
AnalyticsProcessConfig analyticsProcessConfig = createProcessConfig(config, dataExtractor);
278+
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
279+
// If we have no rows, that means there is no data so no point in starting the native process
280+
// just finish the task
281+
if (analyticsProcessConfig.rows() == 0) {
282+
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
283+
return false;
284+
}
285+
process = createProcess(task, analyticsProcessConfig);
277286
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
278287
dataExtractorFactory.newExtractor(true));
279288
resultProcessor = new AnalyticsResultProcessor(id, dataFrameRowsJoiner, this::isProcessKilled, task.getProgressTracker());

x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,34 @@
6262
id: "foo"
6363

6464
---
65+
"Test start with compatible fields but no data":
66+
- do:
67+
indices.create:
68+
index: empty-index-with-compatible-fields
69+
body:
70+
mappings:
71+
properties:
72+
long_field: { "type": "long" }
73+
74+
- do:
75+
ml.put_data_frame_analytics:
76+
id: "empty-with-compatible-fields"
77+
body: >
78+
{
79+
"source": {
80+
"index": "empty-index-with-compatible-fields"
81+
},
82+
"dest": {
83+
"index": "empty-index-with-compatible-fields-dest"
84+
},
85+
"analysis": {"outlier_detection":{}}
86+
}
87+
88+
- do:
89+
catch: /Unable to start empty-with-compatible-fields as there are no analyzable data in source indices \[empty-index-with-compatible-fields\]/
90+
ml.start_data_frame_analytics:
91+
id: "empty-with-compatible-fields"
92+
---
6593
"Test start with inconsistent body/param ids":
6694

6795
- do:

0 commit comments

Comments
 (0)