Skip to content

Commit 62f8da2

Browse files
[ML] Enable reusing field extraction logic when no time field is required (#35100)
1 parent 9eb5769 commit 62f8da2

File tree

13 files changed

+262
-103
lines changed

13 files changed

+262
-103
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6-
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
6+
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
77

88
import org.elasticsearch.common.document.DocumentField;
99
import org.elasticsearch.search.SearchHit;
@@ -18,7 +18,7 @@
1818
* Represents a field to be extracted by the datafeed.
1919
* It encapsulates the extraction logic.
2020
*/
21-
abstract class ExtractedField {
21+
public abstract class ExtractedField {
2222

2323
public enum ExtractionMethod {
2424
SOURCE, DOC_VALUE, SCRIPT_FIELD
Lines changed: 19 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,13 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6-
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
6+
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
77

88
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
99
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
10-
import org.elasticsearch.search.SearchHit;
11-
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
12-
import org.elasticsearch.xpack.core.ml.job.config.Job;
13-
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
1410
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
1511

16-
import java.util.ArrayList;
17-
import java.util.Arrays;
12+
import java.util.Collection;
1813
import java.util.Collections;
1914
import java.util.List;
2015
import java.util.Map;
@@ -25,20 +20,15 @@
2520
/**
2621
* The fields the datafeed has to extract
2722
*/
28-
class ExtractedFields {
23+
public class ExtractedFields {
2924

3025
private static final String TEXT = "text";
3126

32-
private final ExtractedField timeField;
3327
private final List<ExtractedField> allFields;
3428
private final List<ExtractedField> docValueFields;
3529
private final String[] sourceFields;
3630

37-
ExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) {
38-
if (!allFields.contains(timeField)) {
39-
throw new IllegalArgumentException("timeField should also be contained in allFields");
40-
}
41-
this.timeField = Objects.requireNonNull(timeField);
31+
public ExtractedFields(List<ExtractedField> allFields) {
4232
this.allFields = Collections.unmodifiableList(allFields);
4333
this.docValueFields = filterFields(ExtractedField.ExtractionMethod.DOC_VALUE, allFields);
4434
this.sourceFields = filterFields(ExtractedField.ExtractionMethod.SOURCE, allFields).stream().map(ExtractedField::getName)
@@ -61,60 +51,33 @@ private static List<ExtractedField> filterFields(ExtractedField.ExtractionMethod
6151
return fields.stream().filter(field -> field.getExtractionMethod() == method).collect(Collectors.toList());
6252
}
6353

64-
public String timeField() {
65-
return timeField.getName();
54+
public static ExtractedFields build(Collection<String> allFields, Set<String> scriptFields,
55+
FieldCapabilitiesResponse fieldsCapabilities) {
56+
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities);
57+
return new ExtractedFields(allFields.stream().map(field -> extractionMethodDetector.detect(field)).collect(Collectors.toList()));
6658
}
6759

68-
public Long timeFieldValue(SearchHit hit) {
69-
Object[] value = timeField.value(hit);
70-
if (value.length != 1) {
71-
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: "
72-
+ Arrays.toString(value));
73-
}
74-
if (value[0] instanceof Long) {
75-
return (Long) value[0];
76-
}
77-
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]);
78-
}
60+
protected static class ExtractionMethodDetector {
7961

80-
public static ExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
81-
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
82-
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(datafeed.getId(), scriptFields,
83-
fieldsCapabilities);
84-
String timeField = job.getDataDescription().getTimeField();
85-
if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) {
86-
throw ExceptionsHelper.badRequestException("datafeed [" + datafeed.getId() + "] cannot retrieve time field [" + timeField
87-
+ "] because it is not aggregatable");
88-
}
89-
ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ?
90-
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
91-
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
92-
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
93-
allExtractedFields.add(timeExtractedField);
94-
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
95-
return new ExtractedFields(timeExtractedField, allExtractedFields);
96-
}
97-
98-
private static class ExtractionMethodDetector {
99-
100-
private final String datafeedId;
10162
private final Set<String> scriptFields;
10263
private final FieldCapabilitiesResponse fieldsCapabilities;
10364

104-
private ExtractionMethodDetector(String datafeedId, Set<String> scriptFields, FieldCapabilitiesResponse fieldsCapabilities) {
105-
this.datafeedId = datafeedId;
65+
protected ExtractionMethodDetector(Set<String> scriptFields, FieldCapabilitiesResponse fieldsCapabilities) {
10666
this.scriptFields = scriptFields;
10767
this.fieldsCapabilities = fieldsCapabilities;
10868
}
10969

110-
private ExtractedField detect(String field) {
70+
protected ExtractedField detect(String field) {
11171
String internalField = field;
11272
ExtractedField.ExtractionMethod method = ExtractedField.ExtractionMethod.SOURCE;
11373
if (scriptFields.contains(field)) {
11474
method = ExtractedField.ExtractionMethod.SCRIPT_FIELD;
11575
} else if (isAggregatable(field)) {
11676
method = ExtractedField.ExtractionMethod.DOC_VALUE;
117-
} else if (isText(field)) {
77+
if (isFieldOfType(field, "date")) {
78+
return ExtractedField.newTimeField(field, method);
79+
}
80+
} else if (isFieldOfType(field, TEXT)) {
11881
String parentField = MlStrings.getParentField(field);
11982
// Field is text so check if it is a multi-field
12083
if (Objects.equals(parentField, field) == false && fieldsCapabilities.getField(parentField) != null) {
@@ -127,11 +90,10 @@ private ExtractedField detect(String field) {
12790
return ExtractedField.newField(field, internalField, method);
12891
}
12992

130-
private boolean isAggregatable(String field) {
93+
protected boolean isAggregatable(String field) {
13194
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
13295
if (fieldCaps == null || fieldCaps.isEmpty()) {
133-
throw ExceptionsHelper.badRequestException("datafeed [" + datafeedId + "] cannot retrieve field [" + field
134-
+ "] because it has no mappings");
96+
throw new IllegalArgumentException("cannot retrieve field [" + field + "] because it has no mappings");
13597
}
13698
for (FieldCapabilities capsPerIndex : fieldCaps.values()) {
13799
if (!capsPerIndex.isAggregatable()) {
@@ -141,10 +103,10 @@ private boolean isAggregatable(String field) {
141103
return true;
142104
}
143105

144-
private boolean isText(String field) {
106+
private boolean isFieldOfType(String field, String type) {
145107
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
146108
if (fieldCaps != null && fieldCaps.size() == 1) {
147-
return fieldCaps.containsKey(TEXT);
109+
return fieldCaps.containsKey(type);
148110
}
149111
return false;
150112
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
7+
8+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
9+
import org.elasticsearch.search.SearchHit;
10+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
11+
import org.elasticsearch.xpack.core.ml.job.config.Job;
12+
13+
import java.util.ArrayList;
14+
import java.util.Arrays;
15+
import java.util.List;
16+
import java.util.Objects;
17+
import java.util.Set;
18+
import java.util.stream.Collectors;
19+
20+
/**
21+
* The fields to extract for a datafeed that requires a time field
22+
*/
23+
public class TimeBasedExtractedFields extends ExtractedFields {
24+
25+
private final ExtractedField timeField;
26+
27+
public TimeBasedExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) {
28+
super(allFields);
29+
if (!allFields.contains(timeField)) {
30+
throw new IllegalArgumentException("timeField should also be contained in allFields");
31+
}
32+
this.timeField = Objects.requireNonNull(timeField);
33+
}
34+
35+
public String timeField() {
36+
return timeField.getName();
37+
}
38+
39+
public Long timeFieldValue(SearchHit hit) {
40+
Object[] value = timeField.value(hit);
41+
if (value.length != 1) {
42+
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: "
43+
+ Arrays.toString(value));
44+
}
45+
if (value[0] instanceof Long) {
46+
return (Long) value[0];
47+
}
48+
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]);
49+
}
50+
51+
public static TimeBasedExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
52+
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
53+
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities);
54+
String timeField = job.getDataDescription().getTimeField();
55+
if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) {
56+
throw new IllegalArgumentException("cannot retrieve time field [" + timeField + "] because it is not aggregatable");
57+
}
58+
ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ?
59+
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
60+
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
61+
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
62+
allExtractedFields.add(timeExtractedField);
63+
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
64+
return new TimeBasedExtractedFields(timeExtractedField, allExtractedFields);
65+
}
66+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.core.ClientHelper;
2323
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
2424
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
25+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
2526

2627
import java.io.ByteArrayInputStream;
2728
import java.io.ByteArrayOutputStream;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.index.query.QueryBuilder;
99
import org.elasticsearch.search.builder.SearchSourceBuilder;
10+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
1011

1112
import java.util.List;
1213
import java.util.Map;
@@ -15,7 +16,7 @@
1516
class ScrollDataExtractorContext {
1617

1718
final String jobId;
18-
final ExtractedFields extractedFields;
19+
final TimeBasedExtractedFields extractedFields;
1920
final String[] indices;
2021
final String[] types;
2122
final QueryBuilder query;
@@ -25,7 +26,7 @@ class ScrollDataExtractorContext {
2526
final long end;
2627
final Map<String, String> headers;
2728

28-
ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List<String> indices, List<String> types,
29+
ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List<String> indices, List<String> types,
2930
QueryBuilder query, List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize,
3031
long start, long end, Map<String, String> headers) {
3132
this.jobId = Objects.requireNonNull(jobId);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1717
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
1818
import org.elasticsearch.xpack.core.ml.job.config.Job;
19+
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
1920
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
2021
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
22+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
2123

2224
import java.util.Objects;
2325

@@ -26,9 +28,9 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
2628
private final Client client;
2729
private final DatafeedConfig datafeedConfig;
2830
private final Job job;
29-
private final ExtractedFields extractedFields;
31+
private final TimeBasedExtractedFields extractedFields;
3032

31-
private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, ExtractedFields extractedFields) {
33+
private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, TimeBasedExtractedFields extractedFields) {
3234
this.client = Objects.requireNonNull(client);
3335
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
3436
this.job = Objects.requireNonNull(job);
@@ -56,12 +58,14 @@ public static void create(Client client, DatafeedConfig datafeed, Job job, Actio
5658
// Step 2. Contruct the factory and notify listener
5759
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
5860
fieldCapabilitiesResponse -> {
59-
ExtractedFields extractedFields = ExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
61+
TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
6062
listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, extractedFields));
6163
}, e -> {
6264
if (e instanceof IndexNotFoundException) {
6365
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
6466
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
67+
} else if (e instanceof IllegalArgumentException) {
68+
listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage()));
6569
} else {
6670
listener.onFailure(e);
6771
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/SearchHitToJsonProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import org.elasticsearch.common.xcontent.XContentBuilder;
1010
import org.elasticsearch.common.xcontent.json.JsonXContent;
1111
import org.elasticsearch.search.SearchHit;
12+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
13+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
1214

1315
import java.io.IOException;
1416
import java.io.OutputStream;
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6-
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
6+
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;
77

88
import org.elasticsearch.search.SearchHit;
99
import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext;
1010
import org.elasticsearch.test.ESTestCase;
11+
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
1112
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
1213
import org.joda.time.DateTime;
1314

0 commit comments

Comments
 (0)