Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;

import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.search.SearchHit;
Expand All @@ -18,7 +18,7 @@
* Represents a field to be extracted by the datafeed.
* It encapsulates the extraction logic.
*/
abstract class ExtractedField {
public abstract class ExtractedField {

public enum ExtractionMethod {
SOURCE, DOC_VALUE, SCRIPT_FIELD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;

import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -25,20 +20,15 @@
/**
* The fields the datafeed has to extract
*/
class ExtractedFields {
public class ExtractedFields {

private static final String TEXT = "text";

private final ExtractedField timeField;
private final List<ExtractedField> allFields;
private final List<ExtractedField> docValueFields;
private final String[] sourceFields;

ExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) {
if (!allFields.contains(timeField)) {
throw new IllegalArgumentException("timeField should also be contained in allFields");
}
this.timeField = Objects.requireNonNull(timeField);
public ExtractedFields(List<ExtractedField> allFields) {
this.allFields = Collections.unmodifiableList(allFields);
this.docValueFields = filterFields(ExtractedField.ExtractionMethod.DOC_VALUE, allFields);
this.sourceFields = filterFields(ExtractedField.ExtractionMethod.SOURCE, allFields).stream().map(ExtractedField::getName)
Expand All @@ -61,60 +51,33 @@ private static List<ExtractedField> filterFields(ExtractedField.ExtractionMethod
return fields.stream().filter(field -> field.getExtractionMethod() == method).collect(Collectors.toList());
}

public String timeField() {
return timeField.getName();
public static ExtractedFields build(Collection<String> allFields, Set<String> scriptFields,
FieldCapabilitiesResponse fieldsCapabilities) {
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities);
return new ExtractedFields(allFields.stream().map(field -> extractionMethodDetector.detect(field)).collect(Collectors.toList()));
}

public Long timeFieldValue(SearchHit hit) {
Object[] value = timeField.value(hit);
if (value.length != 1) {
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: "
+ Arrays.toString(value));
}
if (value[0] instanceof Long) {
return (Long) value[0];
}
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]);
}
protected static class ExtractionMethodDetector {

public static ExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(datafeed.getId(), scriptFields,
fieldsCapabilities);
String timeField = job.getDataDescription().getTimeField();
if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeed.getId() + "] cannot retrieve time field [" + timeField
+ "] because it is not aggregatable");
}
ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ?
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
allExtractedFields.add(timeExtractedField);
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
return new ExtractedFields(timeExtractedField, allExtractedFields);
}

private static class ExtractionMethodDetector {

private final String datafeedId;
private final Set<String> scriptFields;
private final FieldCapabilitiesResponse fieldsCapabilities;

private ExtractionMethodDetector(String datafeedId, Set<String> scriptFields, FieldCapabilitiesResponse fieldsCapabilities) {
this.datafeedId = datafeedId;
protected ExtractionMethodDetector(Set<String> scriptFields, FieldCapabilitiesResponse fieldsCapabilities) {
this.scriptFields = scriptFields;
this.fieldsCapabilities = fieldsCapabilities;
}

private ExtractedField detect(String field) {
protected ExtractedField detect(String field) {
String internalField = field;
ExtractedField.ExtractionMethod method = ExtractedField.ExtractionMethod.SOURCE;
if (scriptFields.contains(field)) {
method = ExtractedField.ExtractionMethod.SCRIPT_FIELD;
} else if (isAggregatable(field)) {
method = ExtractedField.ExtractionMethod.DOC_VALUE;
} else if (isText(field)) {
if (isFieldOfType(field, "date")) {
return ExtractedField.newTimeField(field, method);
}
} else if (isFieldOfType(field, TEXT)) {
String parentField = MlStrings.getParentField(field);
// Field is text so check if it is a multi-field
if (Objects.equals(parentField, field) == false && fieldsCapabilities.getField(parentField) != null) {
Expand All @@ -127,11 +90,10 @@ private ExtractedField detect(String field) {
return ExtractedField.newField(field, internalField, method);
}

private boolean isAggregatable(String field) {
protected boolean isAggregatable(String field) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps == null || fieldCaps.isEmpty()) {
throw ExceptionsHelper.badRequestException("datafeed [" + datafeedId + "] cannot retrieve field [" + field
+ "] because it has no mappings");
throw new IllegalArgumentException("cannot retrieve field [" + field + "] because it has no mappings");
}
for (FieldCapabilities capsPerIndex : fieldCaps.values()) {
if (!capsPerIndex.isAggregatable()) {
Expand All @@ -141,10 +103,10 @@ private boolean isAggregatable(String field) {
return true;
}

private boolean isText(String field) {
private boolean isFieldOfType(String field, String type) {
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
if (fieldCaps != null && fieldCaps.size() == 1) {
return fieldCaps.containsKey(TEXT);
return fieldCaps.containsKey(type);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;

import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The fields to extract for a datafeed that requires a time field
*/
public class TimeBasedExtractedFields extends ExtractedFields {

private final ExtractedField timeField;

public TimeBasedExtractedFields(ExtractedField timeField, List<ExtractedField> allFields) {
super(allFields);
if (!allFields.contains(timeField)) {
throw new IllegalArgumentException("timeField should also be contained in allFields");
}
this.timeField = Objects.requireNonNull(timeField);
}

public String timeField() {
return timeField.getName();
}

public Long timeFieldValue(SearchHit hit) {
Object[] value = timeField.value(hit);
if (value.length != 1) {
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a single value; actual was: "
+ Arrays.toString(value));
}
if (value[0] instanceof Long) {
return (Long) value[0];
}
throw new RuntimeException("Time field [" + timeField.getAlias() + "] expected a long value; actual was: " + value[0]);
}

public static TimeBasedExtractedFields build(Job job, DatafeedConfig datafeed, FieldCapabilitiesResponse fieldsCapabilities) {
Set<String> scriptFields = datafeed.getScriptFields().stream().map(sf -> sf.fieldName()).collect(Collectors.toSet());
ExtractionMethodDetector extractionMethodDetector = new ExtractionMethodDetector(scriptFields, fieldsCapabilities);
String timeField = job.getDataDescription().getTimeField();
if (scriptFields.contains(timeField) == false && extractionMethodDetector.isAggregatable(timeField) == false) {
throw new IllegalArgumentException("cannot retrieve time field [" + timeField + "] because it is not aggregatable");
}
ExtractedField timeExtractedField = ExtractedField.newTimeField(timeField, scriptFields.contains(timeField) ?
ExtractedField.ExtractionMethod.SCRIPT_FIELD : ExtractedField.ExtractionMethod.DOC_VALUE);
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
allExtractedFields.add(timeExtractedField);
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
return new TimeBasedExtractedFields(timeExtractedField, allExtractedFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;

import java.util.List;
import java.util.Map;
Expand All @@ -15,7 +16,7 @@
class ScrollDataExtractorContext {

final String jobId;
final ExtractedFields extractedFields;
final TimeBasedExtractedFields extractedFields;
final String[] indices;
final String[] types;
final QueryBuilder query;
Expand All @@ -25,7 +26,7 @@ class ScrollDataExtractorContext {
final long end;
final Map<String, String> headers;

ScrollDataExtractorContext(String jobId, ExtractedFields extractedFields, List<String> indices, List<String> types,
ScrollDataExtractorContext(String jobId, TimeBasedExtractedFields extractedFields, List<String> indices, List<String> types,
QueryBuilder query, List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize,
long start, long end, Map<String, String> headers) {
this.jobId = Objects.requireNonNull(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;

import java.util.Objects;

Expand All @@ -26,9 +28,9 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
private final Client client;
private final DatafeedConfig datafeedConfig;
private final Job job;
private final ExtractedFields extractedFields;
private final TimeBasedExtractedFields extractedFields;

private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, ExtractedFields extractedFields) {
private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, TimeBasedExtractedFields extractedFields) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
Expand Down Expand Up @@ -56,12 +58,14 @@ public static void create(Client client, DatafeedConfig datafeed, Job job, Actio
// Step 2. Contruct the factory and notify listener
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
fieldCapabilitiesResponse -> {
ExtractedFields extractedFields = ExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, extractedFields));
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
} else if (e instanceof IllegalArgumentException) {
listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage()));
} else {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;

import java.io.IOException;
import java.io.OutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;
package org.elasticsearch.xpack.ml.datafeed.extractor.fields;

import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
import org.joda.time.DateTime;

Expand Down
Loading