From c3ca17e797ab4b7e2eec57fd151c504488d3ce6a Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Sun, 24 Nov 2019 19:41:13 +0200 Subject: [PATCH 1/3] [ML] Apply source query on data frame analytics memory estimation Closes #49454 --- .../ExplainDataFrameAnalyticsIT.java | 64 +++++++++++++++++++ ...NativeDataFrameAnalyticsIntegTestCase.java | 6 ++ .../DataFrameDataExtractorFactory.java | 22 +++++-- 3 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java new file mode 100644 index 0000000000000..c27d043fcc77c --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; +import org.elasticsearch.xpack.core.ml.utils.QueryProvider; + +import java.io.IOException; + +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase { + + public void testSourceQueryIsApplied() throws IOException { + // To test the source query is applied when we extract data, + // we set up a job where we have a query which excludes all but one document. + // We then assert the memory estimation is low enough. + + String sourceIndex = "test-source-query-is-applied"; + + client().admin().indices().prepareCreate(sourceIndex) + .addMapping("_doc", "numeric_1", "type=double", "numeric_2", "type=float", "categorical", "type=keyword") + .get(); + + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < 30; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + + // We insert one odd value out of 5 for one feature + indexRequest.source("numeric_1", 1.0, "numeric_2", 2.0, "categorical", i == 0 ? "only-one" : "normal"); + bulkRequestBuilder.add(indexRequest); + } + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + String id = "test_source_query_is_applied"; + + DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(); + configBuilder.setId(id); + configBuilder.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, + QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one")))); + configBuilder.setAnalysis(new Classification("categorical")); + DataFrameAnalyticsConfig config = configBuilder.buildForExplain(); + + ExplainDataFrameAnalyticsAction.Response explainResponse = explainDataFrame(config); + + assertThat(explainResponse.getMemoryEstimation().getExpectedMemoryWithoutDisk().getKb(), lessThanOrEqualTo(500L)); + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index c4b39ac1975db..2c23894b340a1 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -20,6 +20,7 @@ import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; +import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; @@ -146,6 +147,11 @@ protected GetDataFrameAnalyticsStatsAction.Response.Stats getAnalyticsStats(Stri return stats.get(0); } + protected ExplainDataFrameAnalyticsAction.Response explainDataFrame(DataFrameAnalyticsConfig config) { + PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(config); + return client().execute(ExplainDataFrameAnalyticsAction.INSTANCE, request).actionGet(); + } + protected EvaluateDataFrameAction.Response evaluateDataFrame(String index, Evaluation evaluation) { EvaluateDataFrameAction.Request request = new EvaluateDataFrameAction.Request() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index f8afd22909831..1c060f178644d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -25,15 +25,18 @@ public class DataFrameDataExtractorFactory { private final Client client; private final String analyticsId; private final List indices; + private final QueryBuilder sourceQuery; private final ExtractedFields extractedFields; private final Map headers; private final boolean includeRowsWithMissingValues; - public DataFrameDataExtractorFactory(Client client, String analyticsId, List indices, ExtractedFields extractedFields, - Map headers, boolean includeRowsWithMissingValues) { + private DataFrameDataExtractorFactory(Client client, String analyticsId, List indices, QueryBuilder sourceQuery, + ExtractedFields extractedFields, Map headers, + boolean includeRowsWithMissingValues) { this.client = Objects.requireNonNull(client); this.analyticsId = Objects.requireNonNull(analyticsId); this.indices = Objects.requireNonNull(indices); + this.sourceQuery = Objects.requireNonNull(sourceQuery); this.extractedFields = Objects.requireNonNull(extractedFields); this.headers = headers; this.includeRowsWithMissingValues = includeRowsWithMissingValues; @@ -54,7 +57,12 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) { } private QueryBuilder createQuery() { - return includeRowsWithMissingValues ? QueryBuilders.matchAllQuery() : allExtractedFieldsExistQuery(); + BoolQueryBuilder query = QueryBuilders.boolQuery(); + query.filter(sourceQuery); + if (includeRowsWithMissingValues == false) { + query.filter(allExtractedFieldsExistQuery()); + } + return query; } private QueryBuilder allExtractedFieldsExistQuery() { @@ -77,8 +85,8 @@ private QueryBuilder allExtractedFieldsExistQuery() { */ public static DataFrameDataExtractorFactory createForSourceIndices(Client client, String taskId, DataFrameAnalyticsConfig config, ExtractedFields extractedFields) { - return new DataFrameDataExtractorFactory(client, taskId, Arrays.asList(config.getSource().getIndex()), extractedFields, - config.getHeaders(), config.getAnalysis().supportsMissingValues()); + return new DataFrameDataExtractorFactory(client, taskId, Arrays.asList(config.getSource().getIndex()), + config.getSource().getParsedQuery(), extractedFields, config.getHeaders(), config.getAnalysis().supportsMissingValues()); } /** @@ -100,8 +108,8 @@ public static void createForDestinationIndex(Client client, extractedFieldsDetector -> { ExtractedFields extractedFields = extractedFieldsDetector.detect().v1(); DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, config.getId(), - Collections.singletonList(config.getDest().getIndex()), extractedFields, config.getHeaders(), - config.getAnalysis().supportsMissingValues()); + Collections.singletonList(config.getDest().getIndex()), config.getSource().getParsedQuery(), extractedFields, + config.getHeaders(), config.getAnalysis().supportsMissingValues()); listener.onResponse(extractorFactory); }, listener::onFailure From 686a2138581101b82b9f67f2f1cdfa014323538d Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Sun, 24 Nov 2019 20:55:05 +0200 Subject: [PATCH 2/3] Fix bwc tests --- .../mixed_cluster/90_ml_data_frame_analytics_crud.yml | 8 ++++---- .../old_cluster/90_ml_data_frame_analytics_crud.yml | 10 ++++++---- .../90_ml_data_frame_analytics_crud.yml | 6 +++--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml index 443925e6739d6..b0cb91c4c0f5c 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/90_ml_data_frame_analytics_crud.yml @@ -7,7 +7,7 @@ - match: { count: 1 } - match: { data_frame_analytics.0.id: "old_cluster_outlier_detection_job" } - match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] } - - match: { data_frame_analytics.0.source.query: {"term" : { "user" : "Kimchy" }} } + - match: { data_frame_analytics.0.source.query: {"term" : { "user.keyword" : "Kimchy" }} } - match: { data_frame_analytics.0.dest.index: "old_cluster_outlier_detection_job_results" } - match: { data_frame_analytics.0.analysis: { "outlier_detection":{ @@ -56,7 +56,7 @@ - match: { count: 1 } - match: { data_frame_analytics.0.id: "old_cluster_regression_job" } - match: { data_frame_analytics.0.source.index: ["bwc_ml_regression_job_source"] } - - match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} } + - match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} } - match: { data_frame_analytics.0.dest.index: "old_cluster_regression_job_results" } - match: { data_frame_analytics.0.analysis.regression.dependent_variable: "foo" } - match: { data_frame_analytics.0.analysis.regression.training_percent: 100.0 } @@ -101,7 +101,7 @@ { "source": { "index": "bwc_ml_outlier_detection_job_source", - "query": {"term" : { "user" : "Kimchy" }} + "query": {"term" : { "user.keyword" : "Kimchy" }} }, "dest": { "index": "mixed_cluster_outlier_detection_job_results" @@ -116,7 +116,7 @@ - match: { count: 1 } - match: { data_frame_analytics.0.id: "mixed_cluster_outlier_detection_job" } - match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] } - - match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} } + - match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} } - match: { data_frame_analytics.0.dest.index: "mixed_cluster_outlier_detection_job_results" } - match: { data_frame_analytics.0.analysis: { "outlier_detection":{ diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/90_ml_data_frame_analytics_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/90_ml_data_frame_analytics_crud.yml index 7cfd2fe1fd15b..fe160bba15f23 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/90_ml_data_frame_analytics_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/90_ml_data_frame_analytics_crud.yml @@ -5,7 +5,8 @@ setup: index: bwc_ml_outlier_detection_job_source body: > { - "numeric_field_1": 42.0 + "numeric_field_1": 42.0, + "user": "Kimchy" } - do: @@ -14,7 +15,8 @@ setup: body: > { "numeric_field_1": 1.0, - "foo": 10.0 + "foo": 10.0, + "user": "Kimchy" } - do: @@ -31,7 +33,7 @@ setup: { "source": { "index": "bwc_ml_outlier_detection_job_source", - "query": {"term" : { "user" : "Kimchy" }} + "query": {"term" : { "user.keyword" : "Kimchy" }} }, "dest": { "index": "old_cluster_outlier_detection_job_results" @@ -50,7 +52,7 @@ setup: { "source": { "index": "bwc_ml_regression_job_source", - "query": {"term" : { "user" : "Kimchy" }} + "query": {"term" : { "user.keyword" : "Kimchy" }} }, "dest": { "index": "old_cluster_regression_job_results" diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/90_ml_data_frame_analytics_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/90_ml_data_frame_analytics_crud.yml index a09865a7d8682..28ec80c6373a2 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/90_ml_data_frame_analytics_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/90_ml_data_frame_analytics_crud.yml @@ -7,7 +7,7 @@ - match: { count: 1 } - match: { data_frame_analytics.0.id: "old_cluster_outlier_detection_job" } - match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] } - - match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} } + - match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} } - match: { data_frame_analytics.0.dest.index: "old_cluster_outlier_detection_job_results" } - match: { data_frame_analytics.0.analysis: { "outlier_detection":{ @@ -36,7 +36,7 @@ - match: { count: 1 } - match: { data_frame_analytics.0.id: "old_cluster_regression_job" } - match: { data_frame_analytics.0.source.index: ["bwc_ml_regression_job_source"] } - - match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} } + - match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} } - match: { data_frame_analytics.0.dest.index: "old_cluster_regression_job_results" } - match: { data_frame_analytics.0.analysis.regression.dependent_variable: "foo" } - match: { data_frame_analytics.0.analysis.regression.training_percent: 100.0 } @@ -62,7 +62,7 @@ - match: { count: 1 } - match: { data_frame_analytics.0.id: "mixed_cluster_outlier_detection_job" } - match: { data_frame_analytics.0.source.index: ["bwc_ml_outlier_detection_job_source"] } - - match: { data_frame_analytics.0.source.query: {"term": { "user": "Kimchy" }} } + - match: { data_frame_analytics.0.source.query: {"term": { "user.keyword": "Kimchy" }} } - match: { data_frame_analytics.0.dest.index: "mixed_cluster_outlier_detection_job_results" } - match: { data_frame_analytics.0.analysis: { "outlier_detection":{ From e265ec2be27a6cf00b11f53b1e3a60a3b98f5a0b Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 25 Nov 2019 11:07:39 +0200 Subject: [PATCH 3/3] Better use of builder pattern --- .../ml/integration/ExplainDataFrameAnalyticsIT.java | 12 ++++++------ .../MlNativeDataFrameAnalyticsIntegTestCase.java | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java index c27d043fcc77c..6796e3b7223d7 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ExplainDataFrameAnalyticsIT.java @@ -50,12 +50,12 @@ public void testSourceQueryIsApplied() throws IOException { String id = "test_source_query_is_applied"; - DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(); - configBuilder.setId(id); - configBuilder.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, - QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one")))); - configBuilder.setAnalysis(new Classification("categorical")); - DataFrameAnalyticsConfig config = configBuilder.buildForExplain(); + DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder() + .setId(id) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, + QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one")))) + .setAnalysis(new Classification("categorical")) + .buildForExplain(); ExplainDataFrameAnalyticsAction.Response explainResponse = explainDataFrame(config); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 2c23894b340a1..06c88a9793b23 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -162,12 +162,12 @@ protected EvaluateDataFrameAction.Response evaluateDataFrame(String index, Evalu protected static DataFrameAnalyticsConfig buildAnalytics(String id, String sourceIndex, String destIndex, @Nullable String resultsField, DataFrameAnalysis analysis) { - DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(); - configBuilder.setId(id); - configBuilder.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null)); - configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField)); - configBuilder.setAnalysis(analysis); - return configBuilder.build(); + return new DataFrameAnalyticsConfig.Builder() + .setId(id) + .setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null)) + .setDest(new DataFrameAnalyticsDest(destIndex, resultsField)) + .setAnalysis(analysis) + .build(); } protected void assertIsStopped(String id) {