From 9c359eb3ea9768dee90841c965a453560e15d097 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 29 Nov 2018 14:52:39 -0600 Subject: [PATCH 1/8] Lazily parsing aggs and query in DatafeedConfigs --- .../core/ml/datafeed/DatafeedConfig.java | 166 +++++++++++++++--- .../core/ml/datafeed/DatafeedUpdate.java | 8 +- .../xpack/core/ml/job/messages/Messages.java | 2 + .../xpack/core/ml/utils/ExceptionsHelper.java | 8 + .../ml/utils/XContentObjectTransformer.java | 10 ++ .../core/ml/datafeed/DatafeedConfigTests.java | 30 ++-- .../core/ml/datafeed/DatafeedUpdateTests.java | 4 +- .../ml/integration/DelayedDataDetectorIT.java | 4 +- .../DelayedDataDetectorFactory.java | 2 +- .../AggregationDataExtractorFactory.java | 4 +- .../RollupDataExtractorFactory.java | 8 +- .../chunked/ChunkedDataExtractorFactory.java | 2 +- .../scroll/ScrollDataExtractorFactory.java | 2 +- .../TransportPreviewDatafeedActionTests.java | 2 +- .../datafeed/DatafeedJobValidatorTests.java | 2 +- .../extractor/DataExtractorFactoryTests.java | 16 +- .../AggregationDataExtractorFactoryTests.java | 11 +- .../ChunkedDataExtractorFactoryTests.java | 4 +- .../integration/BasicDistributedJobsIT.java | 2 +- 19 files changed, 215 insertions(+), 72 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 29956fcc259b0..8738e9f9179a0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -13,9 +13,11 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CachedSupplier; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; @@ -31,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer; import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; import java.io.IOException; @@ -43,6 +46,8 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; /** * Datafeed configuration options. Describes where to proactively pull input @@ -60,6 +65,45 @@ public class DatafeedConfig extends AbstractDiffable implements private static final int TWO_MINS_SECONDS = 2 * SECONDS_IN_MINUTE; private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE; private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE; + static final XContentObjectTransformer QUERY_TRANSFORMER = XContentObjectTransformer.queryBuilderTransformer(); + private static final BiFunction, String, QueryBuilder> lazyQueryParser = (objectMap, id) -> { + try { + return QUERY_TRANSFORMER.fromMap(objectMap); + } catch (IOException | XContentParseException exception) { + // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user + if (exception.getCause() instanceof IllegalArgumentException) { + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, + id, + exception.getCause().getMessage()), + exception.getCause()); + } else { + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception, id), + exception); + } + } + }; + + static final XContentObjectTransformer AGG_TRANSFORMER = XContentObjectTransformer.aggregatorTransformer(); + private static final BiFunction, String, AggregatorFactories.Builder> lazyAggParser = (objectMap, id) -> { + try { + return AGG_TRANSFORMER.fromMap(objectMap); + } catch (IOException | XContentParseException exception) { + // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user + if (exception.getCause() instanceof IllegalArgumentException) { + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, + id, + exception.getCause().getMessage()), + exception.getCause()); + } else { + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, exception.getMessage(), id), + exception); + } + } + }; // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("datafeeds"); @@ -102,9 +146,15 @@ private static ObjectParser createParser(boolean ignoreUnknownFie builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY); parser.declareString((builder, val) -> builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY); - parser.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY); - parser.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS); - parser.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS); + if (ignoreUnknownFields) { + parser.declareObject(Builder::setQuery, (p, c) -> p.map(), QUERY); + parser.declareObject(Builder::setAggregations, (p, c) -> p.map(), AGGREGATIONS); + parser.declareObject(Builder::setAggregations, (p, c) -> p.map(), AGGS); + } else { + parser.declareObject(Builder::setParsedQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY); + parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS); + parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS); + } parser.declareObject(Builder::setScriptFields, (p, c) -> { List parsedScriptFields = new ArrayList<>(); while (p.nextToken() != XContentParser.Token.END_OBJECT) { @@ -146,16 +196,18 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final List indices; private final List types; - private final QueryBuilder query; - private final AggregatorFactories.Builder aggregations; + private final Map query; + private final Map aggregations; private final List scriptFields; private final Integer scrollSize; private final ChunkingConfig chunkingConfig; private final Map headers; private final DelayedDataCheckConfig delayedDataCheckConfig; + private final CachedSupplier querySupplier; + private final CachedSupplier aggSupplier; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, - QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields, + Map query, Map aggregations, List scriptFields, Integer scrollSize, ChunkingConfig chunkingConfig, Map headers, DelayedDataCheckConfig delayedDataCheckConfig) { this.id = id; @@ -171,6 +223,8 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue this.chunkingConfig = chunkingConfig; this.headers = Collections.unmodifiableMap(headers); this.delayedDataCheckConfig = delayedDataCheckConfig; + this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id)); + this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id)); } public DatafeedConfig(StreamInput in) throws IOException { @@ -188,8 +242,17 @@ public DatafeedConfig(StreamInput in) throws IOException { } else { this.types = null; } - this.query = in.readNamedWriteable(QueryBuilder.class); - this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new); + if (in.getVersion().before(Version.CURRENT)) { + this.query = QUERY_TRANSFORMER.toMap(in.readNamedWriteable(QueryBuilder.class)); + this.aggregations = AGG_TRANSFORMER.toMap(in.readOptionalWriteable(AggregatorFactories.Builder::new)); + } else { + this.query = in.readMap(); + if (in.readBoolean()) { + this.aggregations = in.readMap(); + } else { + this.aggregations = null; + } + } if (in.readBoolean()) { this.scriptFields = Collections.unmodifiableList(in.readList(SearchSourceBuilder.ScriptField::new)); } else { @@ -207,6 +270,8 @@ public DatafeedConfig(StreamInput in) throws IOException { } else { delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); } + this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id)); + this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id)); } public String getId() { @@ -237,11 +302,19 @@ public Integer getScrollSize() { return scrollSize; } - public QueryBuilder getQuery() { + public QueryBuilder getParsedQuery() { + return querySupplier.get(); + } + + public Map getQuery() { return query; } - public AggregatorFactories.Builder getAggregations() { + public AggregatorFactories.Builder getParsedAggregations() { + return aggSupplier.get(); + } + + public Map getAggregations() { return aggregations; } @@ -249,14 +322,14 @@ public AggregatorFactories.Builder getAggregations() { * Returns the histogram's interval as epoch millis. */ public long getHistogramIntervalMillis() { - return ExtractorUtils.getHistogramIntervalMillis(aggregations); + return ExtractorUtils.getHistogramIntervalMillis(getParsedAggregations()); } /** * @return {@code true} when there are non-empty aggregations, {@code false} otherwise */ public boolean hasAggregations() { - return aggregations != null && aggregations.count() > 0; + return aggregations != null && aggregations.size() > 0; } public List getScriptFields() { @@ -293,8 +366,16 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeBoolean(false); } - out.writeNamedWriteable(query); - out.writeOptionalWriteable(aggregations); + if (out.getVersion().before(Version.CURRENT)) { + out.writeNamedWriteable(getParsedQuery()); + out.writeOptionalWriteable(getParsedAggregations()); + } else { + out.writeMap(query); + out.writeBoolean(aggregations != null); + if (aggregations != null) { + out.writeMap(aggregations); + } + } if (scriptFields != null) { out.writeBoolean(true); out.writeList(scriptFields); @@ -454,8 +535,8 @@ public static class Builder { private TimeValue frequency; private List indices = Collections.emptyList(); private List types = Collections.emptyList(); - private QueryBuilder query = QueryBuilders.matchAllQuery(); - private AggregatorFactories.Builder aggregations; + private Map query; + private Map aggregations; private List scriptFields; private Integer scrollSize = DEFAULT_SCROLL_SIZE; private ChunkingConfig chunkingConfig; @@ -463,6 +544,9 @@ public static class Builder { private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); public Builder() { + try { + this.query = QUERY_TRANSFORMER.toMap(QueryBuilders.matchAllQuery()); + } catch (IOException ex) { /*Should never happen*/ } } public Builder(String id, String jobId) { @@ -517,11 +601,47 @@ public void setFrequency(TimeValue frequency) { this.frequency = frequency; } - public void setQuery(QueryBuilder query) { + public void setParsedQuery(QueryBuilder query) { + try { + setQuery(QUERY_TRANSFORMER.toMap(ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName()))); + } catch (IOException | IllegalArgumentException exception) { + if (exception.getCause() instanceof IllegalArgumentException) { + // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, + id, + exception.getCause().getMessage()), + exception.getCause()); + } else { + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getMessage()), exception); + } + } + } + + public void setQuery(Map query) { this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName()); } - public void setAggregations(AggregatorFactories.Builder aggregations) { + public void setParsedAggregations(AggregatorFactories.Builder aggregations) { + try { + setAggregations(AGG_TRANSFORMER.toMap(aggregations)); + } catch (IOException | IllegalArgumentException exception) { + // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user + if (exception.getCause() instanceof IllegalArgumentException) { + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, + id, + exception.getCause().getMessage()), + exception.getCause()); + } else { + throw ExceptionsHelper.unprocessableEntityException( + Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getMessage()), exception); + } + } + } + + public void setAggregations(Map aggregations) { this.aggregations = aggregations; } @@ -564,14 +684,16 @@ public DatafeedConfig build() { throw invalidOptionValue(TYPES.getPreferredName(), types); } - validateAggregations(); - setDefaultChunkingConfig(); + AggregatorFactories.Builder parsedAggs = lazyAggParser.apply(aggregations, id); + validateAggregations(parsedAggs); + setDefaultChunkingConfig(parsedAggs); + setDefaultQueryDelay(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, chunkingConfig, headers, delayedDataCheckConfig); } - void validateAggregations() { + void validateAggregations(AggregatorFactories.Builder aggregations) { if (aggregations == null) { return; } @@ -625,7 +747,7 @@ private static void checkHistogramIntervalIsPositive(AggregationBuilder histogra } } - private void setDefaultChunkingConfig() { + private void setDefaultChunkingConfig(AggregatorFactories.Builder aggregations) { if (chunkingConfig == null) { if (aggregations == null) { chunkingConfig = ChunkingConfig.newAuto(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 25a97d081e683..c1a975342651d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -295,10 +295,10 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map h builder.setTypes(types); } if (query != null) { - builder.setQuery(query); + builder.setParsedQuery(query); } if (aggregations != null) { - builder.setAggregations(aggregations); + builder.setParsedAggregations(aggregations); } if (scriptFields != null) { builder.setScriptFields(scriptFields); @@ -371,9 +371,9 @@ boolean isNoop(DatafeedConfig datafeed) { && (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay())) && (indices == null || Objects.equals(indices, datafeed.getIndices())) && (types == null || Objects.equals(types, datafeed.getTypes())) - && (query == null || Objects.equals(query, datafeed.getQuery())) + && (query == null || Objects.equals(query, datafeed.getParsedQuery())) && (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay())) - && (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations())) + && (aggregations == null || Objects.equals(aggregations, datafeed.getParsedAggregations())) && (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields())) && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig())) && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index fcec1ff32f906..038b9a7a1edd1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -26,6 +26,8 @@ public final class Messages { "delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]"; public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS = "delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]"; + public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable: {1}"; + public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable: {1}"; public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency"; public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java index d5b83d25ce315..0b3203cab4a6d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java @@ -54,6 +54,14 @@ public static ElasticsearchStatusException badRequestException(String msg, Objec return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args); } + public static ElasticsearchStatusException unprocessableEntityException(String msg, Throwable cause, Object... args) { + return new ElasticsearchStatusException(msg, RestStatus.UNPROCESSABLE_ENTITY, cause, args); + } + + public static ElasticsearchStatusException unprocessableEntityException(String msg, Object... args) { + return new ElasticsearchStatusException(msg, RestStatus.UNPROCESSABLE_ENTITY, args); + } + /** * Creates an error message that explains there are shard failures, displays info * for the first failure (shard/reason) and kindly asks to see more info in the logs diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java index 00453d3680fe9..ca47e80b4d0c9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java @@ -55,12 +55,19 @@ public static XContentObjectTransformer queryBuilderTransformer() return new XContentObjectTransformer<>(searchRegistry, AbstractQueryBuilder::parseInnerQueryBuilder); } + public static XContentObjectTransformer withSearchRegistry(CheckedFunction parserFunction) { + return new XContentObjectTransformer<>(searchRegistry, parserFunction); + } + XContentObjectTransformer(NamedXContentRegistry registry, CheckedFunction parserFunction) { this.parserFunction = parserFunction; this.registry = registry; } public T fromMap(Map stringObjectMap) throws IOException { + if (stringObjectMap == null) { + return null; + } LoggingDeprecationAccumulationHandler deprecationLogger = new LoggingDeprecationAccumulationHandler(); try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(stringObjectMap); XContentParser parser = XContentType.JSON @@ -74,6 +81,9 @@ public T fromMap(Map stringObjectMap) throws IOException { } public Map toMap(T object) throws IOException { + if (object == null) { + return null; + } try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) { XContentBuilder content = object.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); return XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index fe7c5b1a1d104..a16d23af57c56 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -67,7 +67,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b builder.setIndices(randomStringList(1, 10)); builder.setTypes(randomStringList(0, 10)); if (randomBoolean()) { - builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10))); + builder.setParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10))); } boolean addScriptFields = randomBoolean(); if (addScriptFields) { @@ -91,7 +91,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); aggs.addAggregator(AggregationBuilders.dateHistogram("buckets") .interval(aggHistogramInterval).subAggregation(maxTime).field("time")); - builder.setAggregations(aggs); + builder.setParsedAggregations(aggs); } if (randomBoolean()) { builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); @@ -274,7 +274,7 @@ public void testBuild_GivenScriptFieldsAndAggregations() { datafeed.setTypes(Collections.singletonList("my_type")); datafeed.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField(randomAlphaOfLength(10), mockScript(randomAlphaOfLength(10)), randomBoolean()))); - datafeed.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo"))); + datafeed.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo"))); ElasticsearchException e = expectThrows(ElasticsearchException.class, datafeed::build); @@ -295,7 +295,7 @@ public void testHasAggregations_NonEmpty() { builder.setIndices(Collections.singletonList("myIndex")); builder.setTypes(Collections.singletonList("myType")); MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - builder.setAggregations(new AggregatorFactories.Builder().addAggregator( + builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator( AggregationBuilders.dateHistogram("time").interval(300000).subAggregation(maxTime).field("time"))); DatafeedConfig datafeedConfig = builder.build(); @@ -306,7 +306,7 @@ public void testBuild_GivenEmptyAggregations() { DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); builder.setIndices(Collections.singletonList("myIndex")); builder.setTypes(Collections.singletonList("myType")); - builder.setAggregations(new AggregatorFactories.Builder()); + builder.setParsedAggregations(new AggregatorFactories.Builder()); ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build); @@ -318,13 +318,13 @@ public void testBuild_GivenHistogramWithDefaultInterval() { builder.setIndices(Collections.singletonList("myIndex")); builder.setTypes(Collections.singletonList("myType")); MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - builder.setAggregations(new AggregatorFactories.Builder().addAggregator( + builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("time").subAggregation(maxTime).field("time")) ); ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build); - assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0")); + assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]")); } public void testBuild_GivenDateHistogramWithInvalidTimeZone() { @@ -341,7 +341,7 @@ public void testBuild_GivenDateHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> createDatafeedWithDateHistogram((String) null)); - assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0")); + assertThat(e.getMessage(), containsString("[interval] must be 1 or greater for histogram aggregation [buckets]")); } public void testBuild_GivenValidDateHistogram() { @@ -403,8 +403,8 @@ public void testValidateAggregations_GivenMulitpleHistogramAggs() { toplevelTerms.subAggregation(dateHistogram); DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar"); - builder.setAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms)); - ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> builder.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms))); assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage()); } @@ -520,7 +520,7 @@ private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggre DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); builder.setIndices(Collections.singletonList("myIndex")); builder.setTypes(Collections.singletonList("myType")); - builder.setAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram)); + builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram)); return builder.build(); } @@ -556,11 +556,11 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept break; case 6: BoolQueryBuilder query = new BoolQueryBuilder(); - if (instance.getQuery() != null) { - query.must(instance.getQuery()); + if (instance.getParsedQuery() != null) { + query.must(instance.getParsedQuery()); } query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); - builder.setQuery(query); + builder.setParsedQuery(query); break; case 7: if (instance.hasAggregations()) { @@ -571,7 +571,7 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept aggBuilder .addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField).interval(between(10000, 3600000)) .subAggregation(new MaxAggregationBuilder(timeField).field(timeField))); - builder.setAggregations(aggBuilder); + builder.setParsedAggregations(aggBuilder); if (instance.getScriptFields().isEmpty() == false) { builder.setScriptFields(Collections.emptyList()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index d501dde4eec41..bf61ed541aebb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -167,7 +167,7 @@ public void testApply_givenFullUpdateNoAggregations() { assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_2"))); assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42))); assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142))); - assertThat(updatedDatafeed.getQuery(), equalTo(QueryBuilders.termQuery("a", "b"))); + assertThat(updatedDatafeed.getParsedQuery(), equalTo(QueryBuilders.termQuery("a", "b"))); assertThat(updatedDatafeed.hasAggregations(), is(false)); assertThat(updatedDatafeed.getScriptFields(), equalTo(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false)))); @@ -192,7 +192,7 @@ public void testApply_givenAggregations() { assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1"))); assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_1"))); - assertThat(updatedDatafeed.getAggregations(), + assertThat(updatedDatafeed.getParsedAggregations(), equalTo(new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime)))); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java index 82b35da006d23..ddebbe6038f19 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -153,13 +153,13 @@ public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); - datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator( + datafeedConfigBuilder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("time") .subAggregation(maxTime) .subAggregation(avgAggregationBuilder) .field("time") .interval(TimeValue.timeValueMinutes(5).millis()))); - datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2)); + datafeedConfigBuilder.setParsedQuery(new RangeQueryBuilder("value").gte(numDocs/2)); datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java index 6cf1ffac1c1c2..37f439df7c2d4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java @@ -44,7 +44,7 @@ public static DelayedDataDetector buildDetector(Job job, DatafeedConfig datafeed window, job.getId(), job.getDataDescription().getTimeField(), - datafeedConfig.getQuery(), + datafeedConfig.getParsedQuery(), datafeedConfig.getIndices().toArray(new String[0]), client); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index a4322275e039b..376e9507dcb7c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -35,8 +35,8 @@ public DataExtractor newExtractor(long start, long end) { job.getAnalysisConfig().analysisFields(), datafeedConfig.getIndices(), datafeedConfig.getTypes(), - datafeedConfig.getQuery(), - datafeedConfig.getAggregations(), + datafeedConfig.getParsedQuery(), + datafeedConfig.getParsedAggregations(), Intervals.alignToCeil(start, histogramInterval), Intervals.alignToFloor(end, histogramInterval), job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java index c8a96d6c306af..f0ee22ce85eae 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java @@ -57,8 +57,8 @@ public DataExtractor newExtractor(long start, long end) { job.getAnalysisConfig().analysisFields(), datafeedConfig.getIndices(), datafeedConfig.getTypes(), - datafeedConfig.getQuery(), - datafeedConfig.getAggregations(), + datafeedConfig.getParsedQuery(), + datafeedConfig.getParsedAggregations(), Intervals.alignToCeil(start, histogramInterval), Intervals.alignToFloor(end, histogramInterval), job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), @@ -73,7 +73,7 @@ public static void create(Client client, ActionListener listener) { final AggregationBuilder datafeedHistogramAggregation = getHistogramAggregation( - datafeed.getAggregations().getAggregatorFactories()); + datafeed.getParsedAggregations().getAggregatorFactories()); if ((datafeedHistogramAggregation instanceof DateHistogramAggregationBuilder) == false) { listener.onFailure( new IllegalArgumentException("Rollup requires that the datafeed configuration use a [date_histogram] aggregation," + @@ -104,7 +104,7 @@ public static void create(Client client, return; } final List flattenedAggs = new ArrayList<>(); - flattenAggregations(datafeed.getAggregations().getAggregatorFactories(), datafeedHistogramAggregation, flattenedAggs); + flattenAggregations(datafeed.getParsedAggregations().getAggregatorFactories(), datafeedHistogramAggregation, flattenedAggs); if (validIntervalCaps.stream().noneMatch(rollupJobConfig -> hasAggregations(rollupJobConfig, flattenedAggs))) { listener.onFailure( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index 67079cf2e6777..68161507ed742 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -36,7 +36,7 @@ public DataExtractor newExtractor(long start, long end) { job.getDataDescription().getTimeField(), datafeedConfig.getIndices(), datafeedConfig.getTypes(), - datafeedConfig.getQuery(), + datafeedConfig.getParsedQuery(), datafeedConfig.getScrollSize(), timeAligner.alignToCeil(start), timeAligner.alignToFloor(end), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java index 67689bd51b8b5..986387c2ed808 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorFactory.java @@ -44,7 +44,7 @@ public DataExtractor newExtractor(long start, long end) { extractedFields, datafeedConfig.getIndices(), datafeedConfig.getTypes(), - datafeedConfig.getQuery(), + datafeedConfig.getParsedQuery(), datafeedConfig.getScriptFields(), datafeedConfig.getScrollSize(), start, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java index 50a016f6e5e0a..b2f1107590712 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java @@ -81,7 +81,7 @@ public void testBuildPreviewDatafeed_GivenAggregations() { DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("no_aggs_feed", "job_foo"); datafeed.setIndices(Collections.singletonList("my_index")); MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - datafeed.setAggregations(AggregatorFactories.builder().addAggregator( + datafeed.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time"))); datafeed.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1))); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java index 1507e106c61d3..4007671bbbc92 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java @@ -222,7 +222,7 @@ private static DatafeedConfig.Builder createValidDatafeedConfigWithAggs(double i HistogramAggregationBuilder histogram = AggregationBuilders.histogram("time").interval(interval).field("time").subAggregation(maxTime); DatafeedConfig.Builder datafeedConfig = createValidDatafeedConfig(); - datafeedConfig.setAggregations(new AggregatorFactories.Builder().addAggregator(histogram)); + datafeedConfig.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(histogram)); return datafeedConfig; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java index 9e229e2b057f1..1478a485cc44e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java @@ -143,7 +143,7 @@ public void testCreateDataExtractorFactoryGivenDefaultAggregation() { jobBuilder.setDataDescription(dataDescription); DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time"))); ActionListener listener = ActionListener.wrap( @@ -162,7 +162,7 @@ public void testCreateDataExtractorFactoryGivenAggregationWithOffChunk() { DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time"))); ActionListener listener = ActionListener.wrap( @@ -180,7 +180,7 @@ public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() jobBuilder.setDataDescription(dataDescription); DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time"))); datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); @@ -203,7 +203,7 @@ public void testCreateDataExtractorFactoryGivenRollupAndValidAggregation() { MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); ActionListener listener = ActionListener.wrap( dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(RollupDataExtractorFactory.class)), @@ -223,7 +223,7 @@ public void testCreateDataExtractorFactoryGivenRollupAndValidAggregationAndAutoC MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); ActionListener listener = ActionListener.wrap( dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)), @@ -263,7 +263,7 @@ public void testCreateDataExtractorFactoryGivenRollupWithBadInterval() { MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); ActionListener listener = ActionListener.wrap( dataExtractorFactory -> fail(), @@ -288,7 +288,7 @@ public void testCreateDataExtractorFactoryGivenRollupMissingTerms() { MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); ActionListener listener = ActionListener.wrap( dataExtractorFactory -> fail(), @@ -312,7 +312,7 @@ public void testCreateDataExtractorFactoryGivenRollupMissingMetric() { MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("otherField"); TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); ActionListener listener = ActionListener.wrap( dataExtractorFactory -> fail(), diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java index 8f4aad57c3ffd..cfa20113e9bfd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.junit.Before; +import java.io.IOException; import java.util.Arrays; import java.util.Date; @@ -31,7 +32,7 @@ public void setUpMocks() { client = mock(Client.class); } - public void testNewExtractor_GivenAlignedTimes() { + public void testNewExtractor_GivenAlignedTimes() throws IOException { AggregationDataExtractorFactory factory = createFactory(1000L); AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(2000, 5000); @@ -40,7 +41,7 @@ public void testNewExtractor_GivenAlignedTimes() { assertThat(dataExtractor.getContext().end, equalTo(5000L)); } - public void testNewExtractor_GivenNonAlignedTimes() { + public void testNewExtractor_GivenNonAlignedTimes() throws IOException { AggregationDataExtractorFactory factory = createFactory(1000L); AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(3980, 9200); @@ -49,7 +50,7 @@ public void testNewExtractor_GivenNonAlignedTimes() { assertThat(dataExtractor.getContext().end, equalTo(9000L)); } - private AggregationDataExtractorFactory createFactory(long histogramInterval) { + private AggregationDataExtractorFactory createFactory(long histogramInterval) throws IOException { AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation( AggregationBuilders.max("time").field("time"))); @@ -64,8 +65,8 @@ private AggregationDataExtractorFactory createFactory(long histogramInterval) { jobBuilder.setDataDescription(dataDescription); jobBuilder.setAnalysisConfig(analysisConfig); DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId()); - datafeedConfigBuilder.setAggregations(aggs); + datafeedConfigBuilder.setParsedAggregations(aggs); datafeedConfigBuilder.setIndices(Arrays.asList("my_index")); return new AggregationDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date())); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java index 3dc2364cc2a0b..77a8c936beb37 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactoryTests.java @@ -91,8 +91,8 @@ private ChunkedDataExtractorFactory createFactory(long histogramInterval) { jobBuilder.setDataDescription(dataDescription); jobBuilder.setAnalysisConfig(analysisConfig); DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId()); - datafeedConfigBuilder.setAggregations(aggs); + datafeedConfigBuilder.setParsedAggregations(aggs); datafeedConfigBuilder.setIndices(Arrays.asList("my_index")); return new ChunkedDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()), dataExtractorFactory); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index c86db02ca807e..0ef76131bd652 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -98,7 +98,7 @@ public void testFailOverBasics_withDataFeeder() throws Exception { HistogramAggregationBuilder histogramAggregation = AggregationBuilders.histogram("time").interval(60000) .subAggregation(maxAggregation).field("time"); - configBuilder.setAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation)); + configBuilder.setParsedAggregations(AggregatorFactories.builder().addAggregator(histogramAggregation)); configBuilder.setFrequency(TimeValue.timeValueMinutes(2)); DatafeedConfig config = configBuilder.build(); PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); From 21565c819fece4ebde0da4f41e6981299ace0c1e Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 30 Nov 2018 08:36:08 -0600 Subject: [PATCH 2/8] Adding parser tests --- .../core/ml/datafeed/DatafeedConfig.java | 5 +- .../ml/utils/XContentObjectTransformer.java | 4 - .../core/ml/datafeed/DatafeedConfigTests.java | 75 +++++++++++++++++++ 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 8738e9f9179a0..302a06afcb5d3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -47,7 +47,6 @@ import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.function.Function; /** * Datafeed configuration options. Describes where to proactively pull input @@ -619,7 +618,7 @@ public void setParsedQuery(QueryBuilder query) { } } - public void setQuery(Map query) { + void setQuery(Map query) { this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName()); } @@ -641,7 +640,7 @@ public void setParsedAggregations(AggregatorFactories.Builder aggregations) { } } - public void setAggregations(Map aggregations) { + void setAggregations(Map aggregations) { this.aggregations = aggregations; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java index ca47e80b4d0c9..5d25b9d71e618 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java @@ -55,10 +55,6 @@ public static XContentObjectTransformer queryBuilderTransformer() return new XContentObjectTransformer<>(searchRegistry, AbstractQueryBuilder::parseInnerQueryBuilder); } - public static XContentObjectTransformer withSearchRegistry(CheckedFunction parserFunction) { - return new XContentObjectTransformer<>(searchRegistry, parserFunction); - } - XContentObjectTransformer(NamedXContentRegistry registry, CheckedFunction parserFunction) { this.parserFunction = parserFunction; this.registry = registry; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index a16d23af57c56..18f5e6dbb81e6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -155,6 +155,43 @@ protected DatafeedConfig doParseInstance(XContentParser parser) { " \"scroll_size\": 1234\n" + "}"; + private static final String ANACHRONISTIC_QUERY_DATAFEED = "{\n" + + " \"datafeed_id\": \"farequote-datafeed\",\n" + + " \"job_id\": \"farequote\",\n" + + " \"frequency\": \"1h\",\n" + + " \"indices\": [\"farequote1\", \"farequote2\"],\n" + + //query:match:type stopped being supported in 6.x + " \"query\": {\"match\" : {\"query\":\"fieldName\", \"type\": \"phrase\"}},\n" + + " \"scroll_size\": 1234\n" + + "}"; + + private static final String ANACHRONISTIC_AGG_DATAFEED = "{\n" + + " \"datafeed_id\": \"farequote-datafeed\",\n" + + " \"job_id\": \"farequote\",\n" + + " \"frequency\": \"1h\",\n" + + " \"indices\": [\"farequote1\", \"farequote2\"],\n" + + " \"aggregations\": {\n" + + " \"buckets\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time\",\n" + + " \"interval\": \"360s\",\n" + + " \"time_zone\": \"UTC\"\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"time\": {\n" + + " \"max\": {\"field\": \"time\"}\n" + + " },\n" + + " \"airline\": {\n" + + " \"terms\": {\n" + + " \"field\": \"airline\",\n" + + " \"size\": 0\n" + //size: 0 stopped being supported in 6.x + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + public void testFutureConfigParse() throws IOException { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED); @@ -163,6 +200,44 @@ public void testFutureConfigParse() throws IOException { assertEquals("[6:5] [datafeed_config] unknown field [tomorrows_technology_today], parser not found", e.getMessage()); } + public void testPastQueryConfigParse() throws IOException { + try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) { + + DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build(); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedQuery()); + assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getMessage()); + } + + try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) { + + XContentParseException e = expectThrows(XContentParseException.class, + () -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build()); + assertEquals("[6:25] [datafeed_config] failed to parse field [query]", e.getMessage()); + } + } + + public void testPastAggConfigParse() throws IOException { + try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) { + + DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build()); + assertEquals( + "Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]", + e.getMessage()); + } + + try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) { + + XContentParseException e = expectThrows(XContentParseException.class, + () -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build()); + assertEquals("[8:25] [datafeed_config] failed to parse field [aggregations]", e.getMessage()); + } + } + public void testFutureMetadataParse() throws IOException { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED); From 6eccdfd2d3c647695416f8c3a7a7d31c89bf4bc2 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 30 Nov 2018 08:59:02 -0600 Subject: [PATCH 3/8] Fixing exception types && unneccessary checked ex --- .../xpack/core/ml/datafeed/DatafeedConfig.java | 4 ++-- .../aggregation/AggregationDataExtractorFactoryTests.java | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 302a06afcb5d3..34bc59bf3ae34 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -603,7 +603,7 @@ public void setFrequency(TimeValue frequency) { public void setParsedQuery(QueryBuilder query) { try { setQuery(QUERY_TRANSFORMER.toMap(ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName()))); - } catch (IOException | IllegalArgumentException exception) { + } catch (IOException | XContentParseException exception) { if (exception.getCause() instanceof IllegalArgumentException) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user throw ExceptionsHelper.unprocessableEntityException( @@ -625,7 +625,7 @@ void setQuery(Map query) { public void setParsedAggregations(AggregatorFactories.Builder aggregations) { try { setAggregations(AGG_TRANSFORMER.toMap(aggregations)); - } catch (IOException | IllegalArgumentException exception) { + } catch (IOException | XContentParseException exception) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user if (exception.getCause() instanceof IllegalArgumentException) { throw ExceptionsHelper.unprocessableEntityException( diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java index cfa20113e9bfd..c9a2e8712e243 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactoryTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.junit.Before; -import java.io.IOException; import java.util.Arrays; import java.util.Date; @@ -32,7 +31,7 @@ public void setUpMocks() { client = mock(Client.class); } - public void testNewExtractor_GivenAlignedTimes() throws IOException { + public void testNewExtractor_GivenAlignedTimes() { AggregationDataExtractorFactory factory = createFactory(1000L); AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(2000, 5000); @@ -41,7 +40,7 @@ public void testNewExtractor_GivenAlignedTimes() throws IOException { assertThat(dataExtractor.getContext().end, equalTo(5000L)); } - public void testNewExtractor_GivenNonAlignedTimes() throws IOException { + public void testNewExtractor_GivenNonAlignedTimes() { AggregationDataExtractorFactory factory = createFactory(1000L); AggregationDataExtractor dataExtractor = (AggregationDataExtractor) factory.newExtractor(3980, 9200); @@ -50,7 +49,7 @@ public void testNewExtractor_GivenNonAlignedTimes() throws IOException { assertThat(dataExtractor.getContext().end, equalTo(9000L)); } - private AggregationDataExtractorFactory createFactory(long histogramInterval) throws IOException { + private AggregationDataExtractorFactory createFactory(long histogramInterval) { AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator( AggregationBuilders.histogram("time").field("time").interval(histogramInterval).subAggregation( AggregationBuilders.max("time").field("time"))); From 614e21b4490b0509c863b70bb53f1b364aa4795d Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 30 Nov 2018 13:19:36 -0600 Subject: [PATCH 4/8] Adding semi aggregation parser --- .../core/ml/datafeed/DatafeedConfig.java | 160 ++++++++++++++---- .../core/ml/datafeed/DatafeedConfigTests.java | 18 +- 2 files changed, 138 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 34bc59bf3ae34..8b91bed8f5b78 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -22,10 +22,10 @@ import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; -import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -38,9 +38,9 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -321,7 +321,7 @@ public Map getAggregations() { * Returns the histogram's interval as epoch millis. */ public long getHistogramIntervalMillis() { - return ExtractorUtils.getHistogramIntervalMillis(getParsedAggregations()); + return Builder.getHistogramAggregation(aggregations).getInterval(); } /** @@ -683,16 +683,15 @@ public DatafeedConfig build() { throw invalidOptionValue(TYPES.getPreferredName(), types); } - AggregatorFactories.Builder parsedAggs = lazyAggParser.apply(aggregations, id); - validateAggregations(parsedAggs); - setDefaultChunkingConfig(parsedAggs); + validateAggregations(); + setDefaultChunkingConfig(); setDefaultQueryDelay(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, chunkingConfig, headers, delayedDataCheckConfig); } - void validateAggregations(AggregatorFactories.Builder aggregations) { + void validateAggregations() { if (aggregations == null) { return; } @@ -700,36 +699,39 @@ void validateAggregations(AggregatorFactories.Builder aggregations) { throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS)); } - Collection aggregatorFactories = aggregations.getAggregatorFactories(); - if (aggregatorFactories.isEmpty()) { + + SemiParsedAggregation histogramAggregation = getHistogramAggregation(aggregations); + if (histogramAggregation == null || histogramAggregation.isHistogram() == false) { throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM); } - - AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories); - checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations()); + if (histogramAggregation.getTimeZone() != null && histogramAggregation.getTimeZone().equals("UTC") == false) { + throw ExceptionsHelper.badRequestException("ML requires date_histogram.time_zone to be UTC"); + } + checkNoMoreHistogramAggregations(histogramAggregation.subAggregations); checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation); checkHistogramIntervalIsPositive(histogramAggregation); } - private static void checkNoMoreHistogramAggregations(Collection aggregations) { - for (AggregationBuilder agg : aggregations) { - if (ExtractorUtils.isHistogram(agg)) { + private static void checkNoMoreHistogramAggregations(Map aggregations) { + if (aggregations == null || aggregations.isEmpty()) { + return; + } + for (Map.Entry agg : aggregations.entrySet()) { + SemiParsedAggregation semiParsedAggregation = new SemiParsedAggregation(agg); + if (semiParsedAggregation.isHistogram()) { throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM); } - checkNoMoreHistogramAggregations(agg.getSubAggregations()); + checkNoMoreHistogramAggregations(semiParsedAggregation.subAggregations); } } - static void checkHistogramAggregationHasChildMaxTimeAgg(AggregationBuilder histogramAggregation) { - String timeField = null; - if (histogramAggregation instanceof ValuesSourceAggregationBuilder) { - timeField = ((ValuesSourceAggregationBuilder) histogramAggregation).field(); - } + static void checkHistogramAggregationHasChildMaxTimeAgg(SemiParsedAggregation histogramAggregation) { + String timeField = histogramAggregation.getField(); - for (AggregationBuilder agg : histogramAggregation.getSubAggregations()) { - if (agg instanceof MaxAggregationBuilder) { - MaxAggregationBuilder maxAgg = (MaxAggregationBuilder)agg; - if (maxAgg.field().equals(timeField)) { + for (Map.Entry agg : histogramAggregation.subAggregations.entrySet()) { + SemiParsedAggregation semiParsedAggregation = new SemiParsedAggregation(agg); + if (semiParsedAggregation.type != null && semiParsedAggregation.type.equals(MaxAggregationBuilder.NAME)) { + if (semiParsedAggregation.getField() != null && semiParsedAggregation.getField().equals(timeField)) { return; } } @@ -739,19 +741,18 @@ static void checkHistogramAggregationHasChildMaxTimeAgg(AggregationBuilder histo Messages.getMessage(Messages.DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION, timeField)); } - private static void checkHistogramIntervalIsPositive(AggregationBuilder histogramAggregation) { - long interval = ExtractorUtils.getHistogramIntervalMillis(histogramAggregation); - if (interval <= 0) { + private static void checkHistogramIntervalIsPositive(SemiParsedAggregation histogramAggregation) { + if (histogramAggregation.getInterval() <= 0) { throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO); } } - private void setDefaultChunkingConfig(AggregatorFactories.Builder aggregations) { + private void setDefaultChunkingConfig() { if (chunkingConfig == null) { if (aggregations == null) { chunkingConfig = ChunkingConfig.newAuto(); } else { - long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(aggregations); + long histogramIntervalMillis = getHistogramAggregation(aggregations).getInterval(); chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis( DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis)); } @@ -771,5 +772,102 @@ private static ElasticsearchException invalidOptionValue(String fieldName, Objec String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); throw ExceptionsHelper.badRequestException(msg); } + + private static SemiParsedAggregation getHistogramAggregation(Map aggregations) { + if (aggregations == null || aggregations.isEmpty()) { + return null; + } + if (aggregations.size() != 1) { + throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM_NO_SIBLINGS); + } + + SemiParsedAggregation agg = new SemiParsedAggregation(aggregations.entrySet().iterator().next()); + if (agg.isHistogram()) { + return agg; + } + return getHistogramAggregation(agg.subAggregations); + } + } + + /** + * Class for Semi-parsing aggregations that are of the form {@code Map} + *

+ * None of the regular Aggregation parsing validations are done from this class, only really used for ML specific validations + */ + static class SemiParsedAggregation { + final String name; + final String type; + final Map subAggregations = new HashMap<>(); + private final Map aggOptions; + + /** + * Parse out a {@code Map.Entry} formatted aggregation into a semi-parsed Aggregation so that + * fields used in our validations are readily available. + * + * @param agg The single entry of the form {@code {"aggname":{"aggtype":{options}, "aggs|aggregations"{subaggs}}}} + * @throws IllegalArgumentException + */ + @SuppressWarnings("unchecked") + SemiParsedAggregation(Map.Entry agg) { + String foundType = null; + Map foundAggOptions = new HashMap<>(); + if (agg.getValue() instanceof Map == false) { + throw new IllegalArgumentException("Aggregation [" + agg.getKey() + "] definition incorrectly formatted"); + } + Map definition = (Map) agg.getValue(); + for (Map.Entry entry : definition.entrySet()) { + switch (entry.getKey()) { + case "aggregations": + case "aggs": + if (entry.getValue() instanceof Map == false) { + throw new IllegalArgumentException( + "Aggregation [" + agg.getKey() + "] sub aggregation definition incorrectly formatted"); + } + Map subAgg = (Map) entry.getValue(); + subAgg.forEach((name, value) -> subAggregations.merge(name, value, (oldValue, newValue) -> newValue)); + break; + case "meta": // we don't care about meta information + break; + default: + if (entry.getValue() instanceof Map == false) { + throw new IllegalArgumentException("Aggregation [" + agg.getKey() + "] options incorrectly formatted"); + } + foundType = entry.getKey(); + foundAggOptions = (Map) entry.getValue(); + } + } + name = agg.getKey(); + type = foundType; + aggOptions = foundAggOptions; + } + + boolean isHistogram() { + return type != null && (type.equals(HistogramAggregationBuilder.NAME) || type.equals(DateHistogramAggregationBuilder.NAME)); + } + + long getInterval() { + if (aggOptions.containsKey("interval")) { + if (aggOptions.get("interval") instanceof Number) { + return ((Number) aggOptions.get("interval")).longValue(); + } else { + return ExtractorUtils.validateAndGetCalendarInterval(aggOptions.get("interval").toString()); + } + } + return 0; + } + + String getTimeZone() { + if (aggOptions.containsKey("time_zone")) { + return aggOptions.get("time_zone").toString(); + } + return null; + } + + String getField() { + if (aggOptions.containsKey("field")) { + return aggOptions.get("field").toString(); + } + return null; + } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 18f5e6dbb81e6..f33f733c338e7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -222,8 +222,8 @@ public void testPastAggConfigParse() throws IOException { try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) { - DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null); - ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build()); + DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build(); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedAggregations()); assertEquals( "Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]", e.getMessage()); @@ -399,7 +399,7 @@ public void testBuild_GivenHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build); - assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]")); + assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0")); } public void testBuild_GivenDateHistogramWithInvalidTimeZone() { @@ -416,7 +416,7 @@ public void testBuild_GivenDateHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> createDatafeedWithDateHistogram((String) null)); - assertThat(e.getMessage(), containsString("[interval] must be 1 or greater for histogram aggregation [buckets]")); + assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0")); } public void testBuild_GivenValidDateHistogram() { @@ -457,9 +457,9 @@ public void testChunkingConfig_GivenExplicitSetting() { public void testCheckHistogramAggregationHasChildMaxTimeAgg() { DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time_agg").field("max_time"); - - ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> DatafeedConfig.Builder.checkHistogramAggregationHasChildMaxTimeAgg(dateHistogram)); + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar"); + builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram)); + ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations); assertThat(e.getMessage(), containsString("Date histogram must have nested max aggregation for time_field [max_time]")); } @@ -478,8 +478,8 @@ public void testValidateAggregations_GivenMulitpleHistogramAggs() { toplevelTerms.subAggregation(dateHistogram); DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar"); - ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> builder.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms))); + builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms)); + ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations); assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage()); } From ad31ad24d809497c1196abbb5c491287d94a265a Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Sat, 1 Dec 2018 16:07:54 -0600 Subject: [PATCH 5/8] Adding tests, fixing up semi-parser --- .../core/ml/datafeed/DatafeedConfig.java | 39 +++++-- .../core/ml/datafeed/DatafeedConfigTests.java | 110 ++++++++++++++++++ 2 files changed, 137 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 8b91bed8f5b78..5f0b5a55bead2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -707,7 +708,7 @@ void validateAggregations() { if (histogramAggregation.getTimeZone() != null && histogramAggregation.getTimeZone().equals("UTC") == false) { throw ExceptionsHelper.badRequestException("ML requires date_histogram.time_zone to be UTC"); } - checkNoMoreHistogramAggregations(histogramAggregation.subAggregations); + checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations()); checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation); checkHistogramIntervalIsPositive(histogramAggregation); } @@ -721,16 +722,16 @@ private static void checkNoMoreHistogramAggregations(Map aggrega if (semiParsedAggregation.isHistogram()) { throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM); } - checkNoMoreHistogramAggregations(semiParsedAggregation.subAggregations); + checkNoMoreHistogramAggregations(semiParsedAggregation.getSubAggregations()); } } static void checkHistogramAggregationHasChildMaxTimeAgg(SemiParsedAggregation histogramAggregation) { String timeField = histogramAggregation.getField(); - for (Map.Entry agg : histogramAggregation.subAggregations.entrySet()) { + for (Map.Entry agg : histogramAggregation.getSubAggregations().entrySet()) { SemiParsedAggregation semiParsedAggregation = new SemiParsedAggregation(agg); - if (semiParsedAggregation.type != null && semiParsedAggregation.type.equals(MaxAggregationBuilder.NAME)) { + if (semiParsedAggregation.getType() != null && semiParsedAggregation.getTimeZone().equals(MaxAggregationBuilder.NAME)) { if (semiParsedAggregation.getField() != null && semiParsedAggregation.getField().equals(timeField)) { return; } @@ -795,9 +796,9 @@ private static SemiParsedAggregation getHistogramAggregation(Map * None of the regular Aggregation parsing validations are done from this class, only really used for ML specific validations */ static class SemiParsedAggregation { - final String name; - final String type; - final Map subAggregations = new HashMap<>(); + private final String name; + private final String type; + private final Map subAggregations = new HashMap<>(); private final Map aggOptions; /** @@ -805,23 +806,25 @@ static class SemiParsedAggregation { * fields used in our validations are readily available. * * @param agg The single entry of the form {@code {"aggname":{"aggtype":{options}, "aggs|aggregations"{subaggs}}}} - * @throws IllegalArgumentException + * @throws IllegalArgumentException when agg is incorrectly formatted */ @SuppressWarnings("unchecked") SemiParsedAggregation(Map.Entry agg) { + String foundName = agg.getKey(); String foundType = null; Map foundAggOptions = new HashMap<>(); if (agg.getValue() instanceof Map == false) { - throw new IllegalArgumentException("Aggregation [" + agg.getKey() + "] definition incorrectly formatted"); + throw new IllegalArgumentException("Aggregation [" + foundName + "] definition incorrectly formatted"); } Map definition = (Map) agg.getValue(); for (Map.Entry entry : definition.entrySet()) { switch (entry.getKey()) { + // Get our sub-aggregations if there are any case "aggregations": case "aggs": if (entry.getValue() instanceof Map == false) { throw new IllegalArgumentException( - "Aggregation [" + agg.getKey() + "] sub aggregation definition incorrectly formatted"); + "Aggregation [" + foundName + "] sub aggregation definition incorrectly formatted"); } Map subAgg = (Map) entry.getValue(); subAgg.forEach((name, value) -> subAggregations.merge(name, value, (oldValue, newValue) -> newValue)); @@ -830,13 +833,13 @@ static class SemiParsedAggregation { break; default: if (entry.getValue() instanceof Map == false) { - throw new IllegalArgumentException("Aggregation [" + agg.getKey() + "] options incorrectly formatted"); + throw new IllegalArgumentException("Aggregation [" + foundName + "] options incorrectly formatted"); } foundType = entry.getKey(); foundAggOptions = (Map) entry.getValue(); } } - name = agg.getKey(); + name = foundName; type = foundType; aggOptions = foundAggOptions; } @@ -869,5 +872,17 @@ String getField() { } return null; } + + String getName() { + return name; + } + + String getType() { + return type; + } + + Map getSubAggregations() { + return subAggregations; + } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index f33f733c338e7..f18b7c0d300c8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -41,15 +41,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TimeZone; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; public class DatafeedConfigTests extends AbstractSerializingTestCase { @@ -568,6 +573,111 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); } + public void testSemiParseAgg_badDefinition() { + Map.Entry agg = new HashMap.SimpleEntry<>("aggName", "NotAMap"); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new DatafeedConfig.SemiParsedAggregation(agg)); + assertThat(ex.getMessage(), equalTo("Aggregation [aggName] definition incorrectly formatted")); + } + + public void testSemiParseAgg_badSubAdd() { + Map.Entry agg = new HashMap.SimpleEntry<>("aggName", Collections.singletonMap("aggs", "notAMap")); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new DatafeedConfig.SemiParsedAggregation(agg)); + assertThat(ex.getMessage(), equalTo("Aggregation [aggName] sub aggregation definition incorrectly formatted")); + } + + public void testSemiParseAgg_badOptions() { + Map.Entry agg = new HashMap.SimpleEntry<>("aggName", Collections.singletonMap("max", "notAMap")); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new DatafeedConfig.SemiParsedAggregation(agg)); + assertThat(ex.getMessage(), equalTo("Aggregation [aggName] options incorrectly formatted")); + } + + @SuppressWarnings("unchecked") + public void testSemiParseAgg_ParseSimple() { + Map.Entry aggWithSubAgg = new HashMap.SimpleEntry<>("aggName", + Collections.singletonMap("max", + Collections.singletonMap("field", "myField"))); + + DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(aggWithSubAgg); + assertThat(semiParsedAggregation.getSubAggregations().isEmpty(), is(true)); + assertThat(semiParsedAggregation.getName(), equalTo("aggName")); + assertThat(semiParsedAggregation.getField(), is("myField")); + assertThat(semiParsedAggregation.getType(), equalTo("max")); + assertThat(semiParsedAggregation.isHistogram(), is(false)); + } + + @SuppressWarnings("unchecked") + public void testSemiParseAgg_ParseSimpleMax() { + Map subAgg = Collections.singletonMap("subAggName", + Collections.singletonMap("max", Collections.singletonMap("field", "myField"))); + + Map.Entry maxAggWithSubAgg = new HashMap.SimpleEntry<>("aggName", + new HashMap() {{ + put("meta", "ignored"); + put("aggregations", subAgg); + put("max", Collections.singletonMap("field", "myField")); + }}); + + DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(maxAggWithSubAgg); + assertThat(semiParsedAggregation.getSubAggregations(), equalTo(subAgg)); + assertThat(semiParsedAggregation.getName(), equalTo("aggName")); + assertThat(semiParsedAggregation.getType(), equalTo("max")); + assertThat(semiParsedAggregation.getField(), equalTo("myField")); + assertThat(semiParsedAggregation.isHistogram(), is(false)); + } + + @SuppressWarnings("unchecked") + public void testSemiParseAgg_ParseHistogram() { + Map subAgg = Collections.singletonMap("subAggName", + Collections.singletonMap("max", Collections.singletonMap("field", "myField"))); + + Map.Entry histogramAgg = new HashMap.SimpleEntry<>("aggName", + new HashMap() {{ + put("meta", "ignored"); + put("aggregations", subAgg); + put("histogram", + new HashMap() {{ + put("field", "myField"); + put("interval", Long.valueOf(100)); + }}); + }}); + + DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(histogramAgg); + assertThat(semiParsedAggregation.getSubAggregations(), equalTo(subAgg)); + assertThat(semiParsedAggregation.getName(), equalTo("aggName")); + assertThat(semiParsedAggregation.getType(), equalTo("histogram")); + assertThat(semiParsedAggregation.getField(), equalTo("myField")); + assertThat(semiParsedAggregation.getTimeZone(), is(nullValue())); + assertThat(semiParsedAggregation.getInterval(), equalTo(100L)); + assertThat(semiParsedAggregation.isHistogram(), is(true)); + } + + @SuppressWarnings("unchecked") + public void testSemiParseAgg_ParseDateHistogram() { + Map subAgg = Collections.singletonMap("subAggName", + Collections.singletonMap("max", Collections.singletonMap("field", "myField"))); + + Map.Entry dateHistogramAgg = new HashMap.SimpleEntry<>("aggName", + new HashMap() {{ + put("meta", "ignored"); + put("aggregations", subAgg); + put("date_histogram", + new HashMap() {{ + put("field", "myField"); + put("interval", "1h"); + put("time_zone", "UTC"); + }}); + }}); + + DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(dateHistogramAgg); + assertThat(semiParsedAggregation.getSubAggregations(), equalTo(subAgg)); + assertThat(semiParsedAggregation.getName(), equalTo("aggName")); + assertThat(semiParsedAggregation.getType(), equalTo("date_histogram")); + assertThat(semiParsedAggregation.getField(), equalTo("myField")); + assertThat(semiParsedAggregation.getTimeZone(), equalTo("UTC")); + assertThat(semiParsedAggregation.getInterval(), equalTo(3600000L)); + assertThat(semiParsedAggregation.isHistogram(), is(true)); + } + public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); From f46f613db410f83afec943e069c79a8806f1798a Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 3 Dec 2018 08:54:18 -0600 Subject: [PATCH 6/8] Reverting semi-parsing --- .../core/ml/datafeed/DatafeedConfig.java | 173 +++--------------- .../core/ml/datafeed/DatafeedConfigTests.java | 128 +------------ 2 files changed, 39 insertions(+), 262 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 5f0b5a55bead2..34bc59bf3ae34 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -10,7 +10,6 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; @@ -23,10 +22,10 @@ import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -39,9 +38,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -322,7 +321,7 @@ public Map getAggregations() { * Returns the histogram's interval as epoch millis. */ public long getHistogramIntervalMillis() { - return Builder.getHistogramAggregation(aggregations).getInterval(); + return ExtractorUtils.getHistogramIntervalMillis(getParsedAggregations()); } /** @@ -684,15 +683,16 @@ public DatafeedConfig build() { throw invalidOptionValue(TYPES.getPreferredName(), types); } - validateAggregations(); - setDefaultChunkingConfig(); + AggregatorFactories.Builder parsedAggs = lazyAggParser.apply(aggregations, id); + validateAggregations(parsedAggs); + setDefaultChunkingConfig(parsedAggs); setDefaultQueryDelay(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, chunkingConfig, headers, delayedDataCheckConfig); } - void validateAggregations() { + void validateAggregations(AggregatorFactories.Builder aggregations) { if (aggregations == null) { return; } @@ -700,39 +700,36 @@ void validateAggregations() { throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS)); } - - SemiParsedAggregation histogramAggregation = getHistogramAggregation(aggregations); - if (histogramAggregation == null || histogramAggregation.isHistogram() == false) { + Collection aggregatorFactories = aggregations.getAggregatorFactories(); + if (aggregatorFactories.isEmpty()) { throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM); } - if (histogramAggregation.getTimeZone() != null && histogramAggregation.getTimeZone().equals("UTC") == false) { - throw ExceptionsHelper.badRequestException("ML requires date_histogram.time_zone to be UTC"); - } + + AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories); checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations()); checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation); checkHistogramIntervalIsPositive(histogramAggregation); } - private static void checkNoMoreHistogramAggregations(Map aggregations) { - if (aggregations == null || aggregations.isEmpty()) { - return; - } - for (Map.Entry agg : aggregations.entrySet()) { - SemiParsedAggregation semiParsedAggregation = new SemiParsedAggregation(agg); - if (semiParsedAggregation.isHistogram()) { + private static void checkNoMoreHistogramAggregations(Collection aggregations) { + for (AggregationBuilder agg : aggregations) { + if (ExtractorUtils.isHistogram(agg)) { throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM); } - checkNoMoreHistogramAggregations(semiParsedAggregation.getSubAggregations()); + checkNoMoreHistogramAggregations(agg.getSubAggregations()); } } - static void checkHistogramAggregationHasChildMaxTimeAgg(SemiParsedAggregation histogramAggregation) { - String timeField = histogramAggregation.getField(); + static void checkHistogramAggregationHasChildMaxTimeAgg(AggregationBuilder histogramAggregation) { + String timeField = null; + if (histogramAggregation instanceof ValuesSourceAggregationBuilder) { + timeField = ((ValuesSourceAggregationBuilder) histogramAggregation).field(); + } - for (Map.Entry agg : histogramAggregation.getSubAggregations().entrySet()) { - SemiParsedAggregation semiParsedAggregation = new SemiParsedAggregation(agg); - if (semiParsedAggregation.getType() != null && semiParsedAggregation.getTimeZone().equals(MaxAggregationBuilder.NAME)) { - if (semiParsedAggregation.getField() != null && semiParsedAggregation.getField().equals(timeField)) { + for (AggregationBuilder agg : histogramAggregation.getSubAggregations()) { + if (agg instanceof MaxAggregationBuilder) { + MaxAggregationBuilder maxAgg = (MaxAggregationBuilder)agg; + if (maxAgg.field().equals(timeField)) { return; } } @@ -742,18 +739,19 @@ static void checkHistogramAggregationHasChildMaxTimeAgg(SemiParsedAggregation hi Messages.getMessage(Messages.DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION, timeField)); } - private static void checkHistogramIntervalIsPositive(SemiParsedAggregation histogramAggregation) { - if (histogramAggregation.getInterval() <= 0) { + private static void checkHistogramIntervalIsPositive(AggregationBuilder histogramAggregation) { + long interval = ExtractorUtils.getHistogramIntervalMillis(histogramAggregation); + if (interval <= 0) { throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO); } } - private void setDefaultChunkingConfig() { + private void setDefaultChunkingConfig(AggregatorFactories.Builder aggregations) { if (chunkingConfig == null) { if (aggregations == null) { chunkingConfig = ChunkingConfig.newAuto(); } else { - long histogramIntervalMillis = getHistogramAggregation(aggregations).getInterval(); + long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(aggregations); chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis( DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis)); } @@ -773,116 +771,5 @@ private static ElasticsearchException invalidOptionValue(String fieldName, Objec String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); throw ExceptionsHelper.badRequestException(msg); } - - private static SemiParsedAggregation getHistogramAggregation(Map aggregations) { - if (aggregations == null || aggregations.isEmpty()) { - return null; - } - if (aggregations.size() != 1) { - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM_NO_SIBLINGS); - } - - SemiParsedAggregation agg = new SemiParsedAggregation(aggregations.entrySet().iterator().next()); - if (agg.isHistogram()) { - return agg; - } - return getHistogramAggregation(agg.subAggregations); - } - } - - /** - * Class for Semi-parsing aggregations that are of the form {@code Map} - *

- * None of the regular Aggregation parsing validations are done from this class, only really used for ML specific validations - */ - static class SemiParsedAggregation { - private final String name; - private final String type; - private final Map subAggregations = new HashMap<>(); - private final Map aggOptions; - - /** - * Parse out a {@code Map.Entry} formatted aggregation into a semi-parsed Aggregation so that - * fields used in our validations are readily available. - * - * @param agg The single entry of the form {@code {"aggname":{"aggtype":{options}, "aggs|aggregations"{subaggs}}}} - * @throws IllegalArgumentException when agg is incorrectly formatted - */ - @SuppressWarnings("unchecked") - SemiParsedAggregation(Map.Entry agg) { - String foundName = agg.getKey(); - String foundType = null; - Map foundAggOptions = new HashMap<>(); - if (agg.getValue() instanceof Map == false) { - throw new IllegalArgumentException("Aggregation [" + foundName + "] definition incorrectly formatted"); - } - Map definition = (Map) agg.getValue(); - for (Map.Entry entry : definition.entrySet()) { - switch (entry.getKey()) { - // Get our sub-aggregations if there are any - case "aggregations": - case "aggs": - if (entry.getValue() instanceof Map == false) { - throw new IllegalArgumentException( - "Aggregation [" + foundName + "] sub aggregation definition incorrectly formatted"); - } - Map subAgg = (Map) entry.getValue(); - subAgg.forEach((name, value) -> subAggregations.merge(name, value, (oldValue, newValue) -> newValue)); - break; - case "meta": // we don't care about meta information - break; - default: - if (entry.getValue() instanceof Map == false) { - throw new IllegalArgumentException("Aggregation [" + foundName + "] options incorrectly formatted"); - } - foundType = entry.getKey(); - foundAggOptions = (Map) entry.getValue(); - } - } - name = foundName; - type = foundType; - aggOptions = foundAggOptions; - } - - boolean isHistogram() { - return type != null && (type.equals(HistogramAggregationBuilder.NAME) || type.equals(DateHistogramAggregationBuilder.NAME)); - } - - long getInterval() { - if (aggOptions.containsKey("interval")) { - if (aggOptions.get("interval") instanceof Number) { - return ((Number) aggOptions.get("interval")).longValue(); - } else { - return ExtractorUtils.validateAndGetCalendarInterval(aggOptions.get("interval").toString()); - } - } - return 0; - } - - String getTimeZone() { - if (aggOptions.containsKey("time_zone")) { - return aggOptions.get("time_zone").toString(); - } - return null; - } - - String getField() { - if (aggOptions.containsKey("field")) { - return aggOptions.get("field").toString(); - } - return null; - } - - String getName() { - return name; - } - - String getType() { - return type; - } - - Map getSubAggregations() { - return subAggregations; - } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index f18b7c0d300c8..18f5e6dbb81e6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -41,20 +41,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.TimeZone; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; public class DatafeedConfigTests extends AbstractSerializingTestCase { @@ -227,8 +222,8 @@ public void testPastAggConfigParse() throws IOException { try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) { - DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build(); - ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedAggregations()); + DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build()); assertEquals( "Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]", e.getMessage()); @@ -404,7 +399,7 @@ public void testBuild_GivenHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build); - assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0")); + assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]")); } public void testBuild_GivenDateHistogramWithInvalidTimeZone() { @@ -421,7 +416,7 @@ public void testBuild_GivenDateHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> createDatafeedWithDateHistogram((String) null)); - assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0")); + assertThat(e.getMessage(), containsString("[interval] must be 1 or greater for histogram aggregation [buckets]")); } public void testBuild_GivenValidDateHistogram() { @@ -462,9 +457,9 @@ public void testChunkingConfig_GivenExplicitSetting() { public void testCheckHistogramAggregationHasChildMaxTimeAgg() { DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("time_agg").field("max_time"); - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar"); - builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram)); - ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations); + + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> DatafeedConfig.Builder.checkHistogramAggregationHasChildMaxTimeAgg(dateHistogram)); assertThat(e.getMessage(), containsString("Date histogram must have nested max aggregation for time_field [max_time]")); } @@ -483,8 +478,8 @@ public void testValidateAggregations_GivenMulitpleHistogramAggs() { toplevelTerms.subAggregation(dateHistogram); DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar"); - builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms)); - ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations); + ElasticsearchException e = expectThrows(ElasticsearchException.class, + () -> builder.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms))); assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage()); } @@ -573,111 +568,6 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); } - public void testSemiParseAgg_badDefinition() { - Map.Entry agg = new HashMap.SimpleEntry<>("aggName", "NotAMap"); - IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new DatafeedConfig.SemiParsedAggregation(agg)); - assertThat(ex.getMessage(), equalTo("Aggregation [aggName] definition incorrectly formatted")); - } - - public void testSemiParseAgg_badSubAdd() { - Map.Entry agg = new HashMap.SimpleEntry<>("aggName", Collections.singletonMap("aggs", "notAMap")); - IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new DatafeedConfig.SemiParsedAggregation(agg)); - assertThat(ex.getMessage(), equalTo("Aggregation [aggName] sub aggregation definition incorrectly formatted")); - } - - public void testSemiParseAgg_badOptions() { - Map.Entry agg = new HashMap.SimpleEntry<>("aggName", Collections.singletonMap("max", "notAMap")); - IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new DatafeedConfig.SemiParsedAggregation(agg)); - assertThat(ex.getMessage(), equalTo("Aggregation [aggName] options incorrectly formatted")); - } - - @SuppressWarnings("unchecked") - public void testSemiParseAgg_ParseSimple() { - Map.Entry aggWithSubAgg = new HashMap.SimpleEntry<>("aggName", - Collections.singletonMap("max", - Collections.singletonMap("field", "myField"))); - - DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(aggWithSubAgg); - assertThat(semiParsedAggregation.getSubAggregations().isEmpty(), is(true)); - assertThat(semiParsedAggregation.getName(), equalTo("aggName")); - assertThat(semiParsedAggregation.getField(), is("myField")); - assertThat(semiParsedAggregation.getType(), equalTo("max")); - assertThat(semiParsedAggregation.isHistogram(), is(false)); - } - - @SuppressWarnings("unchecked") - public void testSemiParseAgg_ParseSimpleMax() { - Map subAgg = Collections.singletonMap("subAggName", - Collections.singletonMap("max", Collections.singletonMap("field", "myField"))); - - Map.Entry maxAggWithSubAgg = new HashMap.SimpleEntry<>("aggName", - new HashMap() {{ - put("meta", "ignored"); - put("aggregations", subAgg); - put("max", Collections.singletonMap("field", "myField")); - }}); - - DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(maxAggWithSubAgg); - assertThat(semiParsedAggregation.getSubAggregations(), equalTo(subAgg)); - assertThat(semiParsedAggregation.getName(), equalTo("aggName")); - assertThat(semiParsedAggregation.getType(), equalTo("max")); - assertThat(semiParsedAggregation.getField(), equalTo("myField")); - assertThat(semiParsedAggregation.isHistogram(), is(false)); - } - - @SuppressWarnings("unchecked") - public void testSemiParseAgg_ParseHistogram() { - Map subAgg = Collections.singletonMap("subAggName", - Collections.singletonMap("max", Collections.singletonMap("field", "myField"))); - - Map.Entry histogramAgg = new HashMap.SimpleEntry<>("aggName", - new HashMap() {{ - put("meta", "ignored"); - put("aggregations", subAgg); - put("histogram", - new HashMap() {{ - put("field", "myField"); - put("interval", Long.valueOf(100)); - }}); - }}); - - DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(histogramAgg); - assertThat(semiParsedAggregation.getSubAggregations(), equalTo(subAgg)); - assertThat(semiParsedAggregation.getName(), equalTo("aggName")); - assertThat(semiParsedAggregation.getType(), equalTo("histogram")); - assertThat(semiParsedAggregation.getField(), equalTo("myField")); - assertThat(semiParsedAggregation.getTimeZone(), is(nullValue())); - assertThat(semiParsedAggregation.getInterval(), equalTo(100L)); - assertThat(semiParsedAggregation.isHistogram(), is(true)); - } - - @SuppressWarnings("unchecked") - public void testSemiParseAgg_ParseDateHistogram() { - Map subAgg = Collections.singletonMap("subAggName", - Collections.singletonMap("max", Collections.singletonMap("field", "myField"))); - - Map.Entry dateHistogramAgg = new HashMap.SimpleEntry<>("aggName", - new HashMap() {{ - put("meta", "ignored"); - put("aggregations", subAgg); - put("date_histogram", - new HashMap() {{ - put("field", "myField"); - put("interval", "1h"); - put("time_zone", "UTC"); - }}); - }}); - - DatafeedConfig.SemiParsedAggregation semiParsedAggregation = new DatafeedConfig.SemiParsedAggregation(dateHistogramAgg); - assertThat(semiParsedAggregation.getSubAggregations(), equalTo(subAgg)); - assertThat(semiParsedAggregation.getName(), equalTo("aggName")); - assertThat(semiParsedAggregation.getType(), equalTo("date_histogram")); - assertThat(semiParsedAggregation.getField(), equalTo("myField")); - assertThat(semiParsedAggregation.getTimeZone(), equalTo("UTC")); - assertThat(semiParsedAggregation.getInterval(), equalTo(3600000L)); - assertThat(semiParsedAggregation.isHistogram(), is(true)); - } - public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); From aab1360fcca5a541b1eaf1ea1a014d34aff0e2be Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 3 Dec 2018 10:22:07 -0600 Subject: [PATCH 7/8] Moving agg validations --- .../core/ml/datafeed/DatafeedConfig.java | 39 +++++++++++-------- .../core/ml/datafeed/DatafeedUpdate.java | 1 + .../core/ml/datafeed/DatafeedConfigTests.java | 9 +++-- .../ml/action/TransportPutDatafeedAction.java | 2 + .../action/TransportStartDatafeedAction.java | 1 + 5 files changed, 32 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 34bc59bf3ae34..71a2eb2114b3b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -133,6 +133,21 @@ public class DatafeedConfig extends AbstractDiffable implements public static final ObjectParser LENIENT_PARSER = createParser(true); public static final ObjectParser STRICT_PARSER = createParser(false); + public static void validateAggregations(AggregatorFactories.Builder aggregations) { + if (aggregations == null) { + return; + } + Collection aggregatorFactories = aggregations.getAggregatorFactories(); + if (aggregatorFactories.isEmpty()) { + throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM); + } + + AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories); + Builder.checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations()); + Builder.checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation); + Builder.checkHistogramIntervalIsPositive(histogramAggregation); + } + private static ObjectParser createParser(boolean ignoreUnknownFields) { ObjectParser parser = new ObjectParser<>("datafeed_config", ignoreUnknownFields, Builder::new); @@ -542,6 +557,8 @@ public static class Builder { private Map headers = Collections.emptyMap(); private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); + + public Builder() { try { this.query = QUERY_TRANSFORMER.toMap(QueryBuilders.matchAllQuery()); @@ -683,32 +700,22 @@ public DatafeedConfig build() { throw invalidOptionValue(TYPES.getPreferredName(), types); } - AggregatorFactories.Builder parsedAggs = lazyAggParser.apply(aggregations, id); - validateAggregations(parsedAggs); - setDefaultChunkingConfig(parsedAggs); + validateScriptFields(); + setDefaultChunkingConfig(); setDefaultQueryDelay(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, chunkingConfig, headers, delayedDataCheckConfig); } - void validateAggregations(AggregatorFactories.Builder aggregations) { + void validateScriptFields() { if (aggregations == null) { return; } if (scriptFields != null && !scriptFields.isEmpty()) { throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS)); + Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS)); } - Collection aggregatorFactories = aggregations.getAggregatorFactories(); - if (aggregatorFactories.isEmpty()) { - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM); - } - - AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories); - checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations()); - checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation); - checkHistogramIntervalIsPositive(histogramAggregation); } private static void checkNoMoreHistogramAggregations(Collection aggregations) { @@ -746,12 +753,12 @@ private static void checkHistogramIntervalIsPositive(AggregationBuilder histogra } } - private void setDefaultChunkingConfig(AggregatorFactories.Builder aggregations) { + private void setDefaultChunkingConfig() { if (chunkingConfig == null) { if (aggregations == null) { chunkingConfig = ChunkingConfig.newAuto(); } else { - long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(aggregations); + long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(lazyAggParser.apply(aggregations, id)); chunkingConfig = ChunkingConfig.newManual(TimeValue.timeValueMillis( DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis)); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index c1a975342651d..14bfbea475f01 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -298,6 +298,7 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map h builder.setParsedQuery(query); } if (aggregations != null) { + DatafeedConfig.validateAggregations(aggregations); builder.setParsedAggregations(aggregations); } if (scriptFields != null) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 18f5e6dbb81e6..2787f67952ad2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -416,7 +416,7 @@ public void testBuild_GivenDateHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> createDatafeedWithDateHistogram((String) null)); - assertThat(e.getMessage(), containsString("[interval] must be 1 or greater for histogram aggregation [buckets]")); + assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0")); } public void testBuild_GivenValidDateHistogram() { @@ -477,9 +477,8 @@ public void testValidateAggregations_GivenMulitpleHistogramAggs() { TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level"); toplevelTerms.subAggregation(dateHistogram); - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar"); ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> builder.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms))); + () -> DatafeedConfig.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms))); assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage()); } @@ -595,7 +594,9 @@ private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggre DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); builder.setIndices(Collections.singletonList("myIndex")); builder.setTypes(Collections.singletonList("myType")); - builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram)); + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(dateHistogram); + DatafeedConfig.validateAggregations(aggs); + builder.setParsedAggregations(aggs); return builder.build(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 4aabcb6114edd..ed6e4cc81aa08 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -154,6 +155,7 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ private void putDatafeed(PutDatafeedAction.Request request, Map headers, ActionListener listener) { + DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations()); clusterService.submitStateUpdateTask( "put-datafeed-" + request.getDatafeed().getId(), new AckedClusterStateUpdateTask(request, listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 6d37b08a7e8af..de0caee778e3b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -90,6 +90,7 @@ static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCu throw ExceptionsHelper.missingJobException(datafeed.getJobId()); } DatafeedJobValidator.validate(datafeed, job); + DatafeedConfig.validateAggregations(datafeed.getParsedAggregations()); JobState jobState = MlTasks.getJobState(datafeed.getJobId(), tasks); if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() + From 9d603905711460f8ed5145da57f3769cb1cdc14b Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Mon, 3 Dec 2018 13:49:23 -0600 Subject: [PATCH 8/8] Making bad configs throw badRequestException --- .../xpack/core/ml/datafeed/DatafeedConfig.java | 16 ++++++++-------- .../xpack/core/ml/utils/ExceptionsHelper.java | 8 -------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 71a2eb2114b3b..a6eea6e806028 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -71,13 +71,13 @@ public class DatafeedConfig extends AbstractDiffable implements } catch (IOException | XContentParseException exception) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user if (exception.getCause() instanceof IllegalArgumentException) { - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getCause().getMessage()), exception.getCause()); } else { - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception, id), exception); } @@ -91,13 +91,13 @@ public class DatafeedConfig extends AbstractDiffable implements } catch (IOException | XContentParseException exception) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user if (exception.getCause() instanceof IllegalArgumentException) { - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getCause().getMessage()), exception.getCause()); } else { - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, exception.getMessage(), id), exception); } @@ -623,13 +623,13 @@ public void setParsedQuery(QueryBuilder query) { } catch (IOException | XContentParseException exception) { if (exception.getCause() instanceof IllegalArgumentException) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getCause().getMessage()), exception.getCause()); } else { - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getMessage()), exception); } } @@ -645,13 +645,13 @@ public void setParsedAggregations(AggregatorFactories.Builder aggregations) { } catch (IOException | XContentParseException exception) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user if (exception.getCause() instanceof IllegalArgumentException) { - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getCause().getMessage()), exception.getCause()); } else { - throw ExceptionsHelper.unprocessableEntityException( + throw ExceptionsHelper.badRequestException( Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getMessage()), exception); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java index 0b3203cab4a6d..d5b83d25ce315 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java @@ -54,14 +54,6 @@ public static ElasticsearchStatusException badRequestException(String msg, Objec return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args); } - public static ElasticsearchStatusException unprocessableEntityException(String msg, Throwable cause, Object... args) { - return new ElasticsearchStatusException(msg, RestStatus.UNPROCESSABLE_ENTITY, cause, args); - } - - public static ElasticsearchStatusException unprocessableEntityException(String msg, Object... args) { - return new ElasticsearchStatusException(msg, RestStatus.UNPROCESSABLE_ENTITY, args); - } - /** * Creates an error message that explains there are shard failures, displays info * for the first failure (shard/reason) and kindly asks to see more info in the logs