Skip to content

Commit eced353

Browse files
[FEATURE][ML] Fetch from source when fields are more then docvalue limit (#43204)
1 parent cf8a8bd commit eced353

File tree

7 files changed

+326
-37
lines changed

7 files changed

+326
-37
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
package org.elasticsearch.xpack.ml.integration;
77

88
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
9+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
10+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
911
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1012
import org.elasticsearch.action.bulk.BulkResponse;
1113
import org.elasticsearch.action.get.GetResponse;
1214
import org.elasticsearch.action.index.IndexRequest;
1315
import org.elasticsearch.action.search.SearchResponse;
1416
import org.elasticsearch.action.support.WriteRequest;
1517
import org.elasticsearch.common.Nullable;
18+
import org.elasticsearch.common.xcontent.XContentType;
19+
import org.elasticsearch.index.IndexSettings;
1620
import org.elasticsearch.index.query.QueryBuilders;
1721
import org.elasticsearch.search.SearchHit;
1822
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@@ -147,6 +151,71 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
147151
assertThat(searchResponse.getHits().getTotalHits().value, equalTo((long) docCount));
148152
}
149153

154+
public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Exception {
155+
String sourceIndex = "test-outlier-detection-with-more-fields-than-docvalue-limit";
156+
157+
client().admin().indices().prepareCreate(sourceIndex).get();
158+
159+
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
160+
getSettingsRequest.indices(sourceIndex);
161+
getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
162+
getSettingsRequest.includeDefaults(true);
163+
164+
GetSettingsResponse docValueLimitSetting = client().admin().indices().getSettings(getSettingsRequest).actionGet();
165+
int docValueLimit = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(
166+
docValueLimitSetting.getIndexToSettings().values().iterator().next().value);
167+
168+
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
169+
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
170+
171+
for (int i = 0; i < 100; i++) {
172+
173+
StringBuilder source = new StringBuilder("{");
174+
for (int fieldCount = 0; fieldCount < docValueLimit + 1; fieldCount++) {
175+
source.append("\"field_").append(fieldCount).append("\":").append(randomDouble());
176+
if (fieldCount < docValueLimit) {
177+
source.append(",");
178+
}
179+
}
180+
source.append("}");
181+
182+
IndexRequest indexRequest = new IndexRequest(sourceIndex);
183+
indexRequest.source(source.toString(), XContentType.JSON);
184+
bulkRequestBuilder.add(indexRequest);
185+
}
186+
BulkResponse bulkResponse = bulkRequestBuilder.get();
187+
if (bulkResponse.hasFailures()) {
188+
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
189+
}
190+
191+
String id = "test_outlier_detection_with_more_fields_than_docvalue_limit";
192+
DataFrameAnalyticsConfig config = buildOutlierDetectionAnalytics(id, sourceIndex, null);
193+
registerAnalytics(config);
194+
putAnalytics(config);
195+
196+
assertState(id, DataFrameAnalyticsState.STOPPED);
197+
198+
startAnalytics(id);
199+
waitUntilAnalyticsIsStopped(id);
200+
201+
SearchResponse sourceData = client().prepareSearch(sourceIndex).get();
202+
for (SearchHit hit : sourceData.getHits()) {
203+
GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get();
204+
assertThat(destDocGetResponse.isExists(), is(true));
205+
Map<String, Object> sourceDoc = hit.getSourceAsMap();
206+
Map<String, Object> destDoc = destDocGetResponse.getSource();
207+
for (String field : sourceDoc.keySet()) {
208+
assertThat(destDoc.containsKey(field), is(true));
209+
assertThat(destDoc.get(field), equalTo(sourceDoc.get(field)));
210+
}
211+
assertThat(destDoc.containsKey("ml"), is(true));
212+
Map<String, Object> resultsObject = (Map<String, Object>) destDoc.get("ml");
213+
assertThat(resultsObject.containsKey("outlier_score"), is(true));
214+
double outlierScore = (double) resultsObject.get("outlier_score");
215+
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0)));
216+
}
217+
}
218+
150219
public void testStopOutlierDetectionWithEnoughDocumentsToScroll() {
151220
String sourceIndex = "test-outlier-detection-with-enough-docs-to-scroll";
152221

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/fields/ExtractedField.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public ExtractionMethod getExtractionMethod() {
5858

5959
public abstract Object[] value(SearchHit hit);
6060

61+
public abstract boolean supportsFromSource();
62+
6163
public String getDocValueFormat() {
6264
return null;
6365
}
@@ -93,6 +95,14 @@ public static ExtractedField newField(String alias, String name, ExtractionMetho
9395
}
9496
}
9597

98+
public ExtractedField newFromSource() {
99+
if (supportsFromSource()) {
100+
return new FromSource(alias, name);
101+
}
102+
throw new IllegalStateException("Field (alias [" + alias + "], name [" + name + "]) should be extracted via ["
103+
+ extractionMethod + "] and cannot be extracted from source");
104+
}
105+
96106
private static class FromFields extends ExtractedField {
97107

98108
FromFields(String alias, String name, ExtractionMethod extractionMethod) {
@@ -108,6 +118,11 @@ public Object[] value(SearchHit hit) {
108118
}
109119
return new Object[0];
110120
}
121+
122+
@Override
123+
public boolean supportsFromSource() {
124+
return getExtractionMethod() == ExtractionMethod.DOC_VALUE;
125+
}
111126
}
112127

113128
private static class GeoShapeField extends FromSource {
@@ -195,6 +210,11 @@ private String handleString(String geoString) {
195210
throw new IllegalArgumentException("Unexpected value for a geo_point field: " + geoString);
196211
}
197212
}
213+
214+
@Override
215+
public boolean supportsFromSource() {
216+
return false;
217+
}
198218
}
199219

200220
private static class TimeField extends FromFields {
@@ -223,6 +243,11 @@ public Object[] value(SearchHit hit) {
223243
public String getDocValueFormat() {
224244
return EPOCH_MILLIS_FORMAT;
225245
}
246+
247+
@Override
248+
public boolean supportsFromSource() {
249+
return false;
250+
}
226251
}
227252

228253
private static class FromSource extends ExtractedField {
@@ -257,6 +282,11 @@ public Object[] value(SearchHit hit) {
257282
return new Object[0];
258283
}
259284

285+
@Override
286+
public boolean supportsFromSource() {
287+
return true;
288+
}
289+
260290
@SuppressWarnings("unchecked")
261291
private static Map<String, Object> getNextLevel(Map<String, Object> source, String key) {
262292
Object nextLevel = source.get(key);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.Nullable;
2020
import org.elasticsearch.common.unit.TimeValue;
2121
import org.elasticsearch.search.SearchHit;
22+
import org.elasticsearch.search.fetch.StoredFieldsContext;
2223
import org.elasticsearch.search.sort.SortOrder;
2324
import org.elasticsearch.xpack.core.ClientHelper;
2425
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
@@ -128,8 +129,8 @@ private SearchRequestBuilder buildSearchRequest() {
128129
.addSort(DataFrameAnalyticsFields.ID, SortOrder.ASC)
129130
.setIndices(context.indices)
130131
.setSize(context.scrollSize)
131-
.setQuery(context.query)
132-
.setFetchSource(context.includeSource);
132+
.setQuery(context.query);
133+
setFetchSource(searchRequestBuilder);
133134

134135
for (ExtractedField docValueField : context.extractedFields.getDocValueFields()) {
135136
searchRequestBuilder.addDocValueField(docValueField.getName(), docValueField.getDocValueFormat());
@@ -138,6 +139,20 @@ private SearchRequestBuilder buildSearchRequest() {
138139
return searchRequestBuilder;
139140
}
140141

142+
private void setFetchSource(SearchRequestBuilder searchRequestBuilder) {
143+
if (context.includeSource) {
144+
searchRequestBuilder.setFetchSource(true);
145+
} else {
146+
String[] sourceFields = context.extractedFields.getSourceFields();
147+
if (sourceFields.length == 0) {
148+
searchRequestBuilder.setFetchSource(false);
149+
searchRequestBuilder.storedFields(StoredFieldsContext._NONE_);
150+
} else {
151+
searchRequestBuilder.setFetchSource(sourceFields, null);
152+
}
153+
}
154+
}
155+
141156
private List<Row> processSearchResponse(SearchResponse searchResponse) throws IOException {
142157
scrollId = searchResponse.getScrollId();
143158
if (searchResponse.getHits().getHits().length == 0) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/DataFrameDataExtractorFactory.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,29 @@
55
*/
66
package org.elasticsearch.xpack.ml.dataframe.extractor;
77

8+
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
89
import org.elasticsearch.ResourceNotFoundException;
910
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
12+
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
1013
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
1114
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
1215
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
1316
import org.elasticsearch.client.Client;
17+
import org.elasticsearch.common.collect.ImmutableOpenMap;
18+
import org.elasticsearch.common.settings.Settings;
1419
import org.elasticsearch.index.IndexNotFoundException;
20+
import org.elasticsearch.index.IndexSettings;
1521
import org.elasticsearch.index.query.QueryBuilders;
1622
import org.elasticsearch.xpack.core.ClientHelper;
1723
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
1824
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
1925

2026
import java.util.Arrays;
27+
import java.util.Iterator;
2128
import java.util.Map;
2229
import java.util.Objects;
30+
import java.util.concurrent.atomic.AtomicInteger;
2331

2432
public class DataFrameDataExtractorFactory {
2533

@@ -96,29 +104,65 @@ private static void validateIndexAndExtractFields(Client client,
96104
DataFrameAnalyticsConfig config,
97105
boolean isTaskRestarting,
98106
ActionListener<ExtractedFields> listener) {
99-
// Step 2. Extract fields (if possible) and notify listener
107+
AtomicInteger docValueFieldsLimitHolder = new AtomicInteger();
108+
109+
// Step 3. Extract fields (if possible) and notify listener
100110
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
101-
fieldCapabilitiesResponse -> listener.onResponse(
102-
new ExtractedFieldsDetector(index, config, isTaskRestarting, fieldCapabilitiesResponse).detect()),
111+
fieldCapabilitiesResponse -> listener.onResponse(new ExtractedFieldsDetector(index, config, isTaskRestarting,
112+
docValueFieldsLimitHolder.get(), fieldCapabilitiesResponse).detect()),
113+
listener::onFailure
114+
);
115+
116+
// Step 2. Get field capabilities necessary to build the information of how to extract fields
117+
ActionListener<Integer> docValueFieldsLimitListener = ActionListener.wrap(
118+
docValueFieldsLimit -> {
119+
docValueFieldsLimitHolder.set(docValueFieldsLimit);
120+
121+
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
122+
fieldCapabilitiesRequest.indices(index);
123+
fieldCapabilitiesRequest.fields("*");
124+
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
125+
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
126+
// This response gets discarded - the listener handles the real response
127+
return null;
128+
});
129+
},
130+
listener::onFailure
131+
);
132+
133+
// Step 1. Get doc value fields limit
134+
getDocValueFieldsLimit(client, index, docValueFieldsLimitListener);
135+
}
136+
137+
private static void getDocValueFieldsLimit(Client client, String index, ActionListener<Integer> docValueFieldsLimitListener) {
138+
ActionListener<GetSettingsResponse> settingsListener = ActionListener.wrap(getSettingsResponse -> {
139+
Integer minDocValueFieldsLimit = Integer.MAX_VALUE;
140+
141+
ImmutableOpenMap<String, Settings> indexToSettings = getSettingsResponse.getIndexToSettings();
142+
Iterator<ObjectObjectCursor<String, Settings>> iterator = indexToSettings.iterator();
143+
while (iterator.hasNext()) {
144+
ObjectObjectCursor<String, Settings> indexSettings = iterator.next();
145+
Integer indexMaxDocValueFields = IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.get(indexSettings.value);
146+
if (indexMaxDocValueFields < minDocValueFieldsLimit) {
147+
minDocValueFieldsLimit = indexMaxDocValueFields;
148+
}
149+
}
150+
docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit);
151+
},
103152
e -> {
104153
if (e instanceof IndexNotFoundException) {
105-
listener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
154+
docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
106155
+ ((IndexNotFoundException) e).getIndex() + " does not exist"));
107156
} else {
108-
listener.onFailure(e);
157+
docValueFieldsLimitListener.onFailure(e);
109158
}
110159
}
111160
);
112161

113-
// Step 1. Get field capabilities necessary to build the information of how to extract fields
114-
FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest();
115-
fieldCapabilitiesRequest.indices(index);
116-
fieldCapabilitiesRequest.fields("*");
117-
ClientHelper.executeWithHeaders(config.getHeaders(), ClientHelper.ML_ORIGIN, client, () -> {
118-
client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, fieldCapabilitiesHandler);
119-
// This response gets discarded - the listener handles the real response
120-
return null;
121-
});
162+
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
163+
getSettingsRequest.indices(index);
164+
getSettingsRequest.includeDefaults(true);
165+
getSettingsRequest.names(IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
166+
client.admin().indices().getSettings(getSettingsRequest, settingsListener);
122167
}
123-
124168
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/extractor/ExtractedFieldsDetector.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
1111
import org.elasticsearch.common.Strings;
1212
import org.elasticsearch.common.regex.Regex;
13+
import org.elasticsearch.index.IndexSettings;
1314
import org.elasticsearch.index.mapper.NumberFieldMapper;
1415
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
1516
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@@ -57,13 +58,15 @@ public class ExtractedFieldsDetector {
5758
private final String index;
5859
private final DataFrameAnalyticsConfig config;
5960
private final boolean isTaskRestarting;
61+
private final int docValueFieldsLimit;
6062
private final FieldCapabilitiesResponse fieldCapabilitiesResponse;
6163

62-
ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting,
64+
ExtractedFieldsDetector(String index, DataFrameAnalyticsConfig config, boolean isTaskRestarting, int docValueFieldsLimit,
6365
FieldCapabilitiesResponse fieldCapabilitiesResponse) {
6466
this.index = Objects.requireNonNull(index);
6567
this.config = Objects.requireNonNull(config);
6668
this.isTaskRestarting = isTaskRestarting;
69+
this.docValueFieldsLimit = docValueFieldsLimit;
6770
this.fieldCapabilitiesResponse = Objects.requireNonNull(fieldCapabilitiesResponse);
6871
}
6972

@@ -86,6 +89,14 @@ public ExtractedFields detect() {
8689
if (extractedFields.getAllFields().isEmpty()) {
8790
throw ExceptionsHelper.badRequestException("No compatible fields could be detected in index [{}]", index);
8891
}
92+
if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) {
93+
extractedFields = fetchFromSourceIfSupported(extractedFields);
94+
if (extractedFields.getDocValueFields().size() > docValueFieldsLimit) {
95+
throw ExceptionsHelper.badRequestException("[{}] fields must be retrieved from doc_values but the limit is [{}]; " +
96+
"please adjust the index level setting [{}]", extractedFields.getDocValueFields().size(), docValueFieldsLimit,
97+
IndexSettings.MAX_DOCVALUE_FIELDS_SEARCH_SETTING.getKey());
98+
}
99+
}
89100
return extractedFields;
90101
}
91102

@@ -141,4 +152,11 @@ private void includeAndExcludeFields(Set<String> fields, String index) {
141152
}
142153
}
143154

155+
private ExtractedFields fetchFromSourceIfSupported(ExtractedFields extractedFields) {
156+
List<ExtractedField> adjusted = new ArrayList<>(extractedFields.getAllFields().size());
157+
for (ExtractedField field : extractedFields.getDocValueFields()) {
158+
adjusted.add(field.supportsFromSource() ? field.newFromSource() : field);
159+
}
160+
return new ExtractedFields(adjusted);
161+
}
144162
}

0 commit comments

Comments
 (0)