From a4d94b90b914e0432dbe27d7d1f328bd128a9ed1 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 26 Jul 2019 17:53:54 +0300 Subject: [PATCH 1/2] [ML] Outlier detection should only fetch docs that have the analyzed fields As data frame rows with missing values for analyzed fields are skipped, we can be more efficient by including a query that only picks documents that have values for all analyzed fields. Besides improving the number of documents we go through, we also provide a more accurate measurement of how many rows we need which reduces the memory requirements. This also adds an integration test that runs outlier detection on data with missing fields. --- ...NativeDataFrameAnalyticsIntegTestCase.java | 37 ++++--- .../OutlierDetectionWithMissingFieldsIT.java | 103 ++++++++++++++++++ .../integration/RunDataFrameAnalyticsIT.java | 26 +---- .../DataFrameDataExtractorFactory.java | 13 ++- 4 files changed, 137 insertions(+), 42 deletions(-) create mode 100644 x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 87e723db04896..56ea04793c3cb 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -16,15 +17,16 @@ import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; @@ -97,22 +99,23 @@ protected List getAnalyticsStat return response.getResponse().results(); } - protected List generateData(long timestamp, TimeValue bucketSpan, int bucketCount, - Function timeToCountFunction) throws IOException { - List data = new ArrayList<>(); - long now = timestamp; - for (int bucketIndex = 0; bucketIndex < bucketCount; bucketIndex++) { - for (int count = 0; count < timeToCountFunction.apply(bucketIndex); count++) { - Map record = new HashMap<>(); - record.put("time", now); - data.add(createJsonRecord(record)); - } - now += bucketSpan.getMillis(); - } - return data; - } - protected static String createJsonRecord(Map keyValueMap) throws IOException { return Strings.toString(JsonXContent.contentBuilder().map(keyValueMap)) + "\n"; } + + protected static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex, + @Nullable String resultsField) { + DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id); + configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null)); + configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField)); + configBuilder.setAnalysis(new OutlierDetection()); + return configBuilder.build(); + } + + protected void assertState(String id, DataFrameAnalyticsState state) { + List stats = getAnalyticsStats(id); + assertThat(stats.size(), equalTo(1)); + assertThat(stats.get(0).getId(), equalTo(id)); + assertThat(stats.get(0).getState(), equalTo(state)); + } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java new file mode 100644 index 0000000000000..6062e75c1d7c6 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.junit.After; + +import java.util.Map; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class OutlierDetectionWithMissingFieldsIT extends MlNativeDataFrameAnalyticsIntegTestCase { + + @After + public void cleanup() { + cleanUp(); + } + + public void testMissingFields() throws Exception { + String sourceIndex = "test-outlier-detection-with-missing-fields"; + + client().admin().indices().prepareCreate(sourceIndex) + .addMapping("_doc", "numeric", "type=double", "categorical", "type=keyword") + .get(); + + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + // 5 docs with valid value + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest(sourceIndex); + indexRequest.source("numeric", 42.0, "categorical", "foo"); + bulkRequestBuilder.add(indexRequest); + } + + // Add a doc with missing field + { + IndexRequest missingIndexRequest = new IndexRequest(sourceIndex); + missingIndexRequest.source("categorical", "foo"); + bulkRequestBuilder.add(missingIndexRequest); + } + + // Add a doc with numeric being array which is also treated as missing + { + IndexRequest arrayIndexRequest = new IndexRequest(sourceIndex); + arrayIndexRequest.source("numeric", new double[]{1.0, 2.0}, "categorical", "foo"); + bulkRequestBuilder.add(arrayIndexRequest); + } + + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); + } + + String id = "test_outlier_detection_with_missing_fields"; + DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, new String[] {sourceIndex}, sourceIndex + "-results", null); + registerAnalytics(config); + putAnalytics(config); + + assertState(id, DataFrameAnalyticsState.STOPPED); + + startAnalytics(id); + waitUntilAnalyticsIsStopped(id); + + SearchResponse sourceData = client().prepareSearch(sourceIndex).get(); + for (SearchHit hit : sourceData.getHits()) { + GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get(); + assertThat(destDocGetResponse.isExists(), is(true)); + Map sourceDoc = hit.getSourceAsMap(); + Map destDoc = destDocGetResponse.getSource(); + for (String field : sourceDoc.keySet()) { + assertThat(destDoc.containsKey(field), is(true)); + assertThat(destDoc.get(field), equalTo(sourceDoc.get(field))); + } + if (destDoc.containsKey("numeric") && destDoc.get("numeric") instanceof Double) { + assertThat(destDoc.containsKey("ml"), is(true)); + @SuppressWarnings("unchecked") + Map resultsObject = (Map) destDoc.get("ml"); + + assertThat(resultsObject.containsKey("outlier_score"), is(true)); + double outlierScore = (double) resultsObject.get("outlier_score"); + assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0))); + } else { + assertThat(destDoc.containsKey("ml"), is(false)); + } + } + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java index f711223d691fe..3e4fd4f700313 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java @@ -13,20 +13,14 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; -import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; -import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection; import org.junit.After; -import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.allOf; @@ -97,7 +91,7 @@ public void testOutlierDetectionWithFewDocuments() throws Exception { assertThat(resultsObject.containsKey("outlier_score"), is(true)); double outlierScore = (double) resultsObject.get("outlier_score"); - assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0))); + assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0))); if (hit.getId().equals("outlier")) { scoreOfOutlier = outlierScore; } else { @@ -218,7 +212,7 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex assertThat(resultsObject.containsKey("outlier_score"), is(true)); double outlierScore = (double) resultsObject.get("outlier_score"); - assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0))); + assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0))); } } @@ -368,20 +362,4 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception { .setQuery(QueryBuilders.existsQuery("ml.outlier_score")).get(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) bulkRequestBuilder.numberOfActions())); } - - private static DataFrameAnalyticsConfig buildOutlierDetectionAnalytics(String id, String[] sourceIndex, String destIndex, - @Nullable String resultsField) { - DataFrameAnalyticsConfig.Builder configBuilder = new DataFrameAnalyticsConfig.Builder(id); - configBuilder.setSource(new DataFrameAnalyticsSource(sourceIndex, null)); - configBuilder.setDest(new DataFrameAnalyticsDest(destIndex, resultsField)); - configBuilder.setAnalysis(new OutlierDetection()); - return configBuilder.build(); - } - - private void assertState(String id, DataFrameAnalyticsState state) { - List stats = getAnalyticsStats(id); - assertThat(stats.size(), equalTo(1)); - assertThat(stats.get(0).getId(), equalTo(id)); - assertThat(stats.get(0).getState(), equalTo(state)); - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java index cacf00ad9e9bc..acdf527b84b0a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java @@ -19,9 +19,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField; import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields; import java.util.Arrays; @@ -52,7 +55,7 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) { analyticsId, extractedFields, Arrays.asList(index), - QueryBuilders.matchAllQuery(), + allExtractedFieldsExistQuery(), 1000, headers, includeSource @@ -60,6 +63,14 @@ public DataFrameDataExtractor newExtractor(boolean includeSource) { return new DataFrameDataExtractor(client, context); } + private QueryBuilder allExtractedFieldsExistQuery() { + BoolQueryBuilder query = QueryBuilders.boolQuery(); + for (ExtractedField field : extractedFields.getAllFields()) { + query.filter(QueryBuilders.existsQuery(field.getName())); + } + return query; + } + /** * Validate and create a new extractor factory * From ebada3dfb2f40a00ba78e696b1c936f4b9b46be4 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 29 Jul 2019 13:07:20 +0300 Subject: [PATCH 2/2] Add missing values for the categorical field --- .../ml/integration/OutlierDetectionWithMissingFieldsIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java index 6062e75c1d7c6..79f3af3164a94 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OutlierDetectionWithMissingFieldsIT.java @@ -41,10 +41,10 @@ public void testMissingFields() throws Exception { BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - // 5 docs with valid value + // 5 docs with valid numeric value and missing categorical field (which should be ignored as it's not analyzed) for (int i = 0; i < 5; i++) { IndexRequest indexRequest = new IndexRequest(sourceIndex); - indexRequest.source("numeric", 42.0, "categorical", "foo"); + indexRequest.source("numeric", 42.0); bulkRequestBuilder.add(indexRequest); }