Skip to content

Commit 1865d87

Browse files
[ML] Apply source query on data frame analytics memory estimation (#49517)
Closes #49454
1 parent 3a2339f commit 1865d87

File tree

6 files changed

+104
-24
lines changed

6 files changed

+104
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
9+
import org.elasticsearch.action.bulk.BulkResponse;
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.support.WriteRequest;
12+
import org.elasticsearch.index.query.QueryBuilders;
13+
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
14+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
15+
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
16+
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
17+
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
18+
19+
import java.io.IOException;
20+
21+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
22+
23+
public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase {
24+
25+
public void testSourceQueryIsApplied() throws IOException {
26+
// To test the source query is applied when we extract data,
27+
// we set up a job where we have a query which excludes all but one document.
28+
// We then assert the memory estimation is low enough.
29+
30+
String sourceIndex = "test-source-query-is-applied";
31+
32+
client().admin().indices().prepareCreate(sourceIndex)
33+
.addMapping("_doc", "numeric_1", "type=double", "numeric_2", "type=float", "categorical", "type=keyword")
34+
.get();
35+
36+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
37+
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
38+
39+
for (int i = 0; i < 30; i++) {
40+
IndexRequest indexRequest = new IndexRequest(sourceIndex);
41+
42+
// We insert one odd value out of 5 for one feature
43+
indexRequest.source("numeric_1", 1.0, "numeric_2", 2.0, "categorical", i == 0 ? "only-one" : "normal");
44+
bulkRequestBuilder.add(indexRequest);
45+
}
46+
BulkResponse bulkResponse = bulkRequestBuilder.get();
47+
if (bulkResponse.hasFailures()) {
48+
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
49+
}
50+
51+
String id = "test_source_query_is_applied";
52+
53+
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
54+
.setId(id)
55+
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex },
56+
QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one"))))
57+
.setAnalysis(new Classification("categorical"))
58+
.buildForExplain();
59+
60+
ExplainDataFrameAnalyticsAction.Response explainResponse = explainDataFrame(config);
61+
62+
assertThat(explainResponse.getMemoryEstimation().getExpectedMemoryWithoutDisk().getKb(), lessThanOrEqualTo(500L));
63+
}
64+
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.search.sort.SortOrder;
2121
import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
2222
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
23+
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
2324
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
2425
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
2526
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
@@ -146,6 +147,11 @@ protected GetDataFrameAnalyticsStatsAction.Response.Stats getAnalyticsStats(Stri
146147
return stats.get(0);
147148
}
148149

150+
protected ExplainDataFrameAnalyticsAction.Response explainDataFrame(DataFrameAnalyticsConfig config) {
151+
PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(config);
152+
return client().execute(ExplainDataFrameAnalyticsAction.INSTANCE, request).actionGet();
153+
}
154+
149155
protected EvaluateDataFrameAction.Response evaluateDataFrame(String index, Evaluation evaluation) {
150156
EvaluateDataFrameAction.Request request =
151157
new EvaluateDataFrameAction.Request()
@@ -156,12 +162,12 @@ protected EvaluateDataFrameAction.Response evaluateDataFrame(String index, Evalu
156162

157163
protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourceIndex, String destIndex,
158164
@Nullable String resultsField, DataFrameAnalysis analysis) {
159-
DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder();
160-
configBuilder.setId(id);
161-
configBuilder.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null));
162-
configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField));
163-
configBuilder.setAnalysis(analysis);
164-
return configBuilder.build();
165+
return new DataFrameAnalyticsConfig.Builder()
166+
.setId(id)
167+
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
168+
.setDest(new DataFrameAnalyticsDest(destIndex, resultsField))
169+
.setAnalysis(analysis)
170+
.build();
165171
}
166172

167173
protected void assertIsStopped(String id) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@ public class DataFrameDataExtractorFactory {
2525
private final Client client;
2626
private final String analyticsId;
2727
private final List<String> indices;
28+
private final QueryBuilder sourceQuery;
2829
private final ExtractedFields extractedFields;
2930
private final Map<String, String> headers;
3031
private final boolean includeRowsWithMissingValues;
3132

32-
public DataFrameDataExtractorFactory(Client client, String analyticsId, List<String> indices, ExtractedFields extractedFields,
33-
Map<String, String> headers, boolean includeRowsWithMissingValues) {
33+
private DataFrameDataExtractorFactory(Client client, String analyticsId, List<String> indices, QueryBuilder sourceQuery,
34+
ExtractedFields extractedFields, Map<String, String> headers,
35+
boolean includeRowsWithMissingValues) {
3436
this.client = Objects.requireNonNull(client);
3537
this.analyticsId = Objects.requireNonNull(analyticsId);
3638
this.indices = Objects.requireNonNull(indices);
39+
this.sourceQuery = Objects.requireNonNull(sourceQuery);
3740
this.extractedFields = Objects.requireNonNull(extractedFields);
3841
this.headers = headers;
3942
this.includeRowsWithMissingValues = includeRowsWithMissingValues;
@@ -54,7 +57,12 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) {
5457
}
5558

5659
private QueryBuilder createQuery() {
57-
return includeRowsWithMissingValues ? QueryBuilders.matchAllQuery() : allExtractedFieldsExistQuery();
60+
BoolQueryBuilder query = QueryBuilders.boolQuery();
61+
query.filter(sourceQuery);
62+
if (includeRowsWithMissingValues == false) {
63+
query.filter(allExtractedFieldsExistQuery());
64+
}
65+
return query;
5866
}
5967

6068
private QueryBuilder allExtractedFieldsExistQuery() {
@@ -77,8 +85,8 @@ private QueryBuilder allExtractedFieldsExistQuery() {
7785
*/
7886
public static DataFrameDataExtractorFactory createForSourceIndices(Client client, String taskId, DataFrameAnalyticsConfig config,
7987
ExtractedFields extractedFields) {
80-
return new DataFrameDataExtractorFactory(client, taskId, Arrays.asList(config.getSource().getIndex()), extractedFields,
81-
config.getHeaders(), config.getAnalysis().supportsMissingValues());
88+
return new DataFrameDataExtractorFactory(client, taskId, Arrays.asList(config.getSource().getIndex()),
89+
config.getSource().getParsedQuery(), extractedFields, config.getHeaders(), config.getAnalysis().supportsMissingValues());
8290
}
8391

8492
/**
@@ -100,8 +108,8 @@ public static void createForDestinationIndex(Client client,
100108
extractedFieldsDetector -> {
101109
ExtractedFields extractedFields = extractedFieldsDetector.detect().v1();
102110
DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, config.getId(),
103-
Collections.singletonList(config.getDest().getIndex()), extractedFields, config.getHeaders(),
104-
config.getAnalysis().supportsMissingValues());
111+
Collections.singletonList(config.getDest().getIndex()), config.getSource().getParsedQuery(), extractedFields,
112+
config.getHeaders(), config.getAnalysis().supportsMissingValues());
105113
listener.onResponse(extractorFactory);
106114
},
107115
listener::onFailure

x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
- match: { count: 1 }
88
- match: { data_frame_analytics.0.id: "old_cluster_outlier_detection_job" }
99
- match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] }
10-
- match: { data_frame_analytics.0.source.query: {"term" : { "user" : "Kimchy" }} }
10+
- match: { data_frame_analytics.0.source.query: {"term" : { "user.keyword" : "Kimchy" }} }
1111
- match: { data_frame_analytics.0.dest.index: "old_cluster_outlier_detection_job_results" }
1212
- match: { data_frame_analytics.0.analysis: {
1313
"outlier_detection":{
@@ -56,7 +56,7 @@
5656
- match: { count: 1 }
5757
- match: { data_frame_analytics.0.id: "old_cluster_regression_job" }
5858
- match: { data_frame_analytics.0.source.index: ["bwc_ml_regression_job_source"] }
59-
- match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} }
59+
- match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} }
6060
- match: { data_frame_analytics.0.dest.index: "old_cluster_regression_job_results" }
6161
- match: { data_frame_analytics.0.analysis.regression.dependent_variable: "foo" }
6262
- match: { data_frame_analytics.0.analysis.regression.training_percent: 100.0 }
@@ -101,7 +101,7 @@
101101
{
102102
"source": {
103103
"index": "bwc_ml_outlier_detection_job_source",
104-
"query": {"term" : { "user" : "Kimchy" }}
104+
"query": {"term" : { "user.keyword" : "Kimchy" }}
105105
},
106106
"dest": {
107107
"index": "mixed_cluster_outlier_detection_job_results"
@@ -116,7 +116,7 @@
116116
- match: { count: 1 }
117117
- match: { data_frame_analytics.0.id: "mixed_cluster_outlier_detection_job" }
118118
- match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] }
119-
- match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} }
119+
- match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} }
120120
- match: { data_frame_analytics.0.dest.index: "mixed_cluster_outlier_detection_job_results" }
121121
- match: { data_frame_analytics.0.analysis: {
122122
"outlier_detection":{

x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/90_ml_data_frame_analytics_crud.yml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ setup:
55
index: bwc_ml_outlier_detection_job_source
66
body: >
77
{
8-
"numeric_field_1": 42.0
8+
"numeric_field_1": 42.0,
9+
"user": "Kimchy"
910
}
1011
1112
- do:
@@ -14,7 +15,8 @@ setup:
1415
body: >
1516
{
1617
"numeric_field_1": 1.0,
17-
"foo": 10.0
18+
"foo": 10.0,
19+
"user": "Kimchy"
1820
}
1921
2022
- do:
@@ -31,7 +33,7 @@ setup:
3133
{
3234
"source": {
3335
"index": "bwc_ml_outlier_detection_job_source",
34-
"query": {"term" : { "user" : "Kimchy" }}
36+
"query": {"term" : { "user.keyword" : "Kimchy" }}
3537
},
3638
"dest": {
3739
"index": "old_cluster_outlier_detection_job_results"
@@ -50,7 +52,7 @@ setup:
5052
{
5153
"source": {
5254
"index": "bwc_ml_regression_job_source",
53-
"query": {"term" : { "user" : "Kimchy" }}
55+
"query": {"term" : { "user.keyword" : "Kimchy" }}
5456
},
5557
"dest": {
5658
"index": "old_cluster_regression_job_results"

x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/90_ml_data_frame_analytics_crud.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
- match: { count: 1 }
88
- match: { data_frame_analytics.0.id: "old_cluster_outlier_detection_job" }
99
- match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] }
10-
- match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} }
10+
- match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} }
1111
- match: { data_frame_analytics.0.dest.index: "old_cluster_outlier_detection_job_results" }
1212
- match: { data_frame_analytics.0.analysis: {
1313
"outlier_detection":{
@@ -36,7 +36,7 @@
3636
- match: { count: 1 }
3737
- match: { data_frame_analytics.0.id: "old_cluster_regression_job" }
3838
- match: { data_frame_analytics.0.source.index: ["bwc_ml_regression_job_source"] }
39-
- match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} }
39+
- match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} }
4040
- match: { data_frame_analytics.0.dest.index: "old_cluster_regression_job_results" }
4141
- match: { data_frame_analytics.0.analysis.regression.dependent_variable: "foo" }
4242
- match: { data_frame_analytics.0.analysis.regression.training_percent: 100.0 }
@@ -62,7 +62,7 @@
6262
- match: { count: 1 }
6363
- match: { data_frame_analytics.0.id: "mixed_cluster_outlier_detection_job" }
6464
- match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] }
65-
- match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} }
65+
- match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} }
6666
- match: { data_frame_analytics.0.dest.index: "mixed_cluster_outlier_detection_job_results" }
6767
- match: { data_frame_analytics.0.analysis: {
6868
"outlier_detection":{

0 commit comments

Comments
 (0)