From 0e816a5562b58d97ccc4ea36eaa1501d5f5ac08b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 25 Sep 2018 09:52:18 +0200 Subject: [PATCH 1/5] replace mocked configuration with one from job config --- ...nsportPutFeatureIndexBuilderJobAction.java | 8 +- ...sportStopFeatureIndexBuilderJobAction.java | 34 ++++---- .../job/AggregationConfig.java | 82 +++++++++++++++++++ .../job/FeatureIndexBuilderIndexer.java | 44 ++++------ .../job/FeatureIndexBuilderJobConfig.java | 74 +++++++++++++++-- .../job/FeatureIndexBuilderJobState.java | 1 - .../featureindexbuilder/job/SourceConfig.java | 55 +++++++++++++ ...tureIndexBuilderJobActionRequestTests.java | 3 +- ...erializingFeatureIndexBuilderTestCase.java | 49 +++++++++++ .../job/AggregationConfigTests.java | 60 ++++++++++++++ .../FeatureIndexBuilderJobConfigTests.java | 47 +++++++++++ 11 files changed, 400 insertions(+), 57 deletions(-) create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java index 37252bf82a410..7e28f68a23eda 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java @@ -37,9 +37,6 @@ public class TransportPutFeatureIndexBuilderJobAction extends TransportMasterNodeAction { - // TODO: hack, to be replaced - private static final String PIVOT_INDEX = "pivot-reviews"; - private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; private final Client client; @@ -76,7 +73,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); FeatureIndexBuilderJob job = createFeatureIndexBuilderJob(request.getConfig(), threadPool); - createIndex(client, job.getConfig().getId()); + createIndex(client, job.getConfig().getDestinationIndex()); startPersistentTask(job, listener, persistentTasksService); } @@ -105,9 +102,8 @@ protected ClusterBlockException checkBlock(PutFeatureIndexBuilderJobAction.Reque * * TODO: everything below will be replaced with proper implementation read from job configuration */ - private static void createIndex(Client client, String suffix) { + private static void createIndex(Client client, String indexName) { - String indexName = PIVOT_INDEX + "_" + suffix; CreateIndexRequest request = new CreateIndexRequest(indexName); request.settings(Settings.builder() // <1> diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java index 68a4fee37fa77..74e31e1bb4e29 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java @@ -23,26 +23,26 @@ import java.io.IOException; import java.util.List; -public class TransportStopFeatureIndexBuilderJobAction extends TransportTasksAction { - +public class TransportStopFeatureIndexBuilderJobAction extends + TransportTasksAction { @Inject - public TransportStopFeatureIndexBuilderJobAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, ClusterService clusterService) { + public TransportStopFeatureIndexBuilderJobAction(Settings settings, TransportService transportService, ActionFilters actionFilters, + ClusterService clusterService) { super(settings, StopFeatureIndexBuilderJobAction.NAME, clusterService, transportService, actionFilters, StopFeatureIndexBuilderJobAction.Request::new, StopFeatureIndexBuilderJobAction.Response::new, ThreadPool.Names.SAME); } @Override - protected void doExecute(Task task, StopFeatureIndexBuilderJobAction.Request request, ActionListener listener) { + protected void doExecute(Task task, StopFeatureIndexBuilderJobAction.Request request, + ActionListener listener) { super.doExecute(task, request, listener); } @Override - protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, - FeatureIndexBuilderJobTask jobTask, - ActionListener listener) { + protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, FeatureIndexBuilderJobTask jobTask, + ActionListener listener) { if (jobTask.getConfig().getId().equals(request.getId())) { jobTask.stop(listener); } else { @@ -52,19 +52,18 @@ protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, } @Override - protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndexBuilderJobAction.Request request, List tasks, - List taskOperationFailures, - List failedNodeExceptions) { + protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndexBuilderJobAction.Request request, + List tasks, List taskOperationFailures, + List failedNodeExceptions) { if (taskOperationFailures.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper - .convertToElastic(taskOperationFailures.get(0).getCause()); + throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); } else if (failedNodeExceptions.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper - .convertToElastic(failedNodeExceptions.get(0)); + throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); } - // Either the job doesn't exist (the user didn't create it yet) or was deleted after the Stop API executed. + // Either the job doesn't exist (the user didn't create it yet) or was deleted + // after the Stop API executed. // In either case, let the user know if (tasks.size() == 0) { throw new ResourceNotFoundException("Task for Feature Index Builder Job [" + request.getId() + "] not found"); @@ -80,5 +79,4 @@ protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndex protected StopFeatureIndexBuilderJobAction.Response readTaskResponse(StreamInput in) throws IOException { return new StopFeatureIndexBuilderJobAction.Response(in); } - } \ No newline at end of file diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java new file mode 100644 index 0000000000000..40cc3e103eea1 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/* + * Wrapper for the aggregations config part of a composite aggregation. + * + * For now just wraps aggregations from composite aggs. + * + */ +public class AggregationConfig implements Writeable, ToXContentObject { + + private final AggregatorFactories.Builder aggregatorFactoryBuilder; + + public AggregationConfig(AggregatorFactories.Builder aggregatorFactoryBuilder) { + this.aggregatorFactoryBuilder = aggregatorFactoryBuilder; + } + + AggregationConfig(final StreamInput in) throws IOException { + aggregatorFactoryBuilder = in.readOptionalWriteable(AggregatorFactories.Builder::new); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + /*for (AggregationBuilder subAgg : getAggregatorFactories()) { + subAgg.toXContent(builder, params); + } + return builder; + */ + + return aggregatorFactoryBuilder.toXContent(builder, params); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(aggregatorFactoryBuilder); + } + + public List getAggregatorFactories() { + return aggregatorFactoryBuilder.getAggregatorFactories(); + } + + public static AggregationConfig fromXContent(final XContentParser parser) throws IOException { + return new AggregationConfig(AggregatorFactories.parseAggregators(parser)); + } + + @Override + public int hashCode() { + return Objects.hash(aggregatorFactoryBuilder); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final AggregationConfig that = (AggregationConfig) other; + + return Objects.equals(this.aggregatorFactoryBuilder, that.aggregatorFactoryBuilder); + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index 998a295f1297e..3d6eb0faeed6d 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -13,12 +13,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; -import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; -import org.elasticsearch.search.aggregations.metrics.InternalAvg; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -37,8 +36,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, FeatureIndexBuilderJobStats> { - private static final String PIVOT_INDEX = "pivot-reviews"; - private static final String SOURCE_INDEX = "anonreviews"; private static final Logger logger = Logger.getLogger(FeatureIndexBuilderIndexer.class.getName()); private FeatureIndexBuilderJob job; @@ -71,21 +68,23 @@ protected IterationResult> doProcess(SearchResponse searchRe * TODO: replace with proper implementation */ private List processBuckets(CompositeAggregation agg) { + String destinationFieldName = job.getConfig().getSourceConfig().getSourceBuilder().name(); + String aggName = job.getConfig().getAggregationConfig().getAggregatorFactories().get(0).getName(); + return agg.getBuckets().stream().map(b -> { - InternalAvg avgAgg = b.getAggregations().get("avg_rating"); + NumericMetricsAggregation.SingleValue aggResult = b.getAggregations().get(aggName); XContentBuilder builder; try { builder = jsonBuilder(); - builder.startObject(); - builder.field("reviewerId", b.getKey().get("reviewerId")); - builder.field("avg_rating", avgAgg.getValue()); + builder.field(destinationFieldName, b.getKey().get(destinationFieldName)); + builder.field(aggName, aggResult.value()); builder.endObject(); } catch (IOException e) { throw new UncheckedIOException(e); } - String indexName = PIVOT_INDEX + "_" + job.getConfig().getId(); + String indexName = job.getConfig().getDestinationIndex(); IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder); return request; }).collect(Collectors.toList()); @@ -93,33 +92,26 @@ private List processBuckets(CompositeAggregation agg) { @Override protected SearchRequest buildSearchRequest() { - final Map position = getPosition(); - SearchRequest request = buildFeatureQuery(position); - return request; - } - /* - * Mocked demo case - * - * TODO: everything below will be replaced with proper implementation read from job configuration - */ - private static SearchRequest buildFeatureQuery(Map after) { QueryBuilder queryBuilder = new MatchAllQueryBuilder(); - SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX); + SearchRequest searchRequest = new SearchRequest(job.getConfig().getIndexPattern()); List> sources = new ArrayList<>(); - sources.add(new TermsValuesSourceBuilder("reviewerId").field("reviewerId")); + + sources.add(job.getConfig().getSourceConfig().getSourceBuilder()); CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources); compositeAggregation.size(1000); - if (after != null) { - compositeAggregation.aggregateAfter(after); + if (position != null) { + compositeAggregation.aggregateAfter(position); + } + + for (AggregationBuilder agg : job.getConfig().getAggregationConfig().getAggregatorFactories()) { + compositeAggregation.subAggregation(agg); } - compositeAggregation.subAggregation(AggregationBuilders.avg("avg_rating").field("rating")); - compositeAggregation.subAggregation(AggregationBuilders.cardinality("dc_vendors").field("vendorId")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.aggregation(compositeAggregation); sourceBuilder.size(0); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java index 645a97ab8d928..e3a1035b24fc5 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java @@ -16,9 +16,11 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; /** @@ -28,25 +30,55 @@ public class FeatureIndexBuilderJobConfig implements NamedWriteable, ToXContentO private static final String NAME = "xpack/feature_index_builder/jobconfig"; private static final ParseField ID = new ParseField("id"); + private static final ParseField INDEX_PATTERN = new ParseField("index_pattern"); + private static final ParseField DESTINATION_INDEX = new ParseField("destination_index"); + private static final ParseField SOURCES = new ParseField("sources"); + private static final ParseField AGGREGATIONS = new ParseField("aggregations"); private final String id; + private final String indexPattern; + private final String destinationIndex; + private final SourceConfig sourceConfig; + private final AggregationConfig aggregationConfig; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, false, (args, optionalId) -> { String id = args[0] != null ? (String) args[0] : optionalId; - return new FeatureIndexBuilderJobConfig(id); + String indexPattern = (String) args[1]; + String destinationIndex = (String) args[2]; + SourceConfig sourceConfig= (SourceConfig) args[3]; + AggregationConfig aggregationConfig = (AggregationConfig) args[4]; + return new FeatureIndexBuilderJobConfig(id, indexPattern, destinationIndex, sourceConfig, aggregationConfig); }); static { PARSER.declareString(optionalConstructorArg(), ID); + PARSER.declareString(constructorArg(), INDEX_PATTERN); + PARSER.declareString(constructorArg(), DESTINATION_INDEX); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> SourceConfig.fromXContent(p), SOURCES); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS); } - public FeatureIndexBuilderJobConfig(final String id) { - this.id = id; + public FeatureIndexBuilderJobConfig(final String id, + final String indexPattern, + final String destinationIndex, + final SourceConfig sourceConfig, + final AggregationConfig aggregationConfig) { + this.id = ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); + this.indexPattern = ExceptionsHelper.requireNonNull(indexPattern, INDEX_PATTERN.getPreferredName()); + this.destinationIndex = ExceptionsHelper.requireNonNull(destinationIndex, DESTINATION_INDEX.getPreferredName()); + + // TODO: check for null? + this.sourceConfig = sourceConfig; + this.aggregationConfig = aggregationConfig; } public FeatureIndexBuilderJobConfig(final StreamInput in) throws IOException { id = in.readString(); + indexPattern = in.readString(); + destinationIndex = in.readString(); + sourceConfig = in.readOptionalWriteable(SourceConfig::new); + aggregationConfig = in.readOptionalWriteable(AggregationConfig::new); } public String getId() { @@ -57,13 +89,41 @@ public String getCron() { return "*"; } + public String getIndexPattern() { + return indexPattern; + } + + public String getDestinationIndex() { + return destinationIndex; + } + + public SourceConfig getSourceConfig() { + return sourceConfig; + } + + public AggregationConfig getAggregationConfig() { + return aggregationConfig; + } + public void writeTo(final StreamOutput out) throws IOException { out.writeString(id); + out.writeString(indexPattern); + out.writeString(destinationIndex); + out.writeOptionalWriteable(sourceConfig); + out.writeOptionalWriteable(aggregationConfig); } public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); builder.field(ID.getPreferredName(), id); + builder.field(INDEX_PATTERN.getPreferredName(), indexPattern); + builder.field(DESTINATION_INDEX.getPreferredName(), destinationIndex); + if (sourceConfig != null) { + builder.field(SOURCES.getPreferredName(), sourceConfig); + } + if (aggregationConfig!=null) { + builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig); + } builder.endObject(); return builder; } @@ -85,12 +145,16 @@ public boolean equals(Object other) { final FeatureIndexBuilderJobConfig that = (FeatureIndexBuilderJobConfig) other; - return Objects.equals(this.id, that.id); + return Objects.equals(this.id, that.id) + && Objects.equals(this.indexPattern, that.indexPattern) + && Objects.equals(this.destinationIndex, that.destinationIndex) + && Objects.equals(this.sourceConfig, that.sourceConfig) + && Objects.equals(this.aggregationConfig, that.aggregationConfig); } @Override public int hashCode() { - return Objects.hash(id); + return Objects.hash(id, indexPattern, destinationIndex, sourceConfig, aggregationConfig); } @Override diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java index 580a3fe81ce30..49e387d6d98dc 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java @@ -66,7 +66,6 @@ public class FeatureIndexBuilderJobState implements Task.Status, PersistentTaskS public FeatureIndexBuilderJobState(IndexerState state, @Nullable Map position) { this.state = state; this.currentPosition = Collections.unmodifiableSortedMap(position == null ? null : new TreeMap<>(position)); - } public FeatureIndexBuilderJobState(StreamInput in) throws IOException { diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java new file mode 100644 index 0000000000000..86f223124ff44 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceParserHelper; + +import java.io.IOException; + +/* + * Wrapper for the Source config part of a composite aggregation. + * + * For now just wraps sources from composite aggs. + */ +public class SourceConfig implements Writeable, ToXContentObject { + private final CompositeValuesSourceBuilder sourceBuilder; + + SourceConfig(final StreamInput in) throws IOException { + sourceBuilder = CompositeValuesSourceParserHelper.readFrom(in); + } + + public SourceConfig(CompositeValuesSourceBuilder sourceBuilder) { + this.sourceBuilder = sourceBuilder; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return CompositeValuesSourceParserHelper.toXContent(sourceBuilder, builder, params); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + CompositeValuesSourceParserHelper.writeTo(sourceBuilder, out); + } + + public static SourceConfig fromXContent(final XContentParser parser) throws IOException { + CompositeValuesSourceBuilder builder = CompositeValuesSourceParserHelper.fromXContent(parser); + return new SourceConfig(builder); + } + + public CompositeValuesSourceBuilder getSourceBuilder() { + return sourceBuilder; + } + +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java index 14bedcbe909b1..3efed4c21e801 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.test.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.featureindexbuilder.action.PutFeatureIndexBuilderJobAction.Request; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfigTests; import org.junit.Before; import java.io.IOException; @@ -40,7 +41,7 @@ protected boolean supportsUnknownFields() { @Override protected Request createTestInstance() { - FeatureIndexBuilderJobConfig config = new FeatureIndexBuilderJobConfig(randomAlphaOfLengthBetween(1,10)); + FeatureIndexBuilderJobConfig config = FeatureIndexBuilderJobConfigTests.randomFeatureIndexBuilderJobConfig(); return new Request(config); } diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java new file mode 100644 index 0000000000000..ed2c419ed8421 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.junit.Before; + +import static java.util.Collections.emptyList; + +public abstract class AbstractSerializingFeatureIndexBuilderTestCase + extends AbstractSerializingTestCase { + + /** + * Test case that ensure aggregation named objects are registered + */ + + private NamedWriteableRegistry namedWriteableRegistry; + private NamedXContentRegistry namedXContentRegistry; + + @Before + public void setUp() throws Exception { + super.setUp(); + + // register aggregations as NamedWriteable + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); + namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents()); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return namedWriteableRegistry; + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return namedXContentRegistry; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java new file mode 100644 index 0000000000000..6adf459cb35fe --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import java.io.IOException; + +public class AggregationConfigTests extends AbstractSerializingFeatureIndexBuilderTestCase { + + public static AggregationConfig randonAggregationConfig() { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + + for (int i = 1; i < randomIntBetween(1, 20); ++i) { + builder.addAggregator(getRandomSupportedAggregation()); + } + + return new AggregationConfig(builder); + } + + @Override + protected AggregationConfig doParseInstance(XContentParser parser) throws IOException { + //parser.nextToken(); + + return AggregationConfig.fromXContent(parser); + } + + @Override + protected AggregationConfig createTestInstance() { + return randonAggregationConfig(); + } + + @Override + protected Reader instanceReader() { + return AggregationConfig::new; + } + + private static AggregationBuilder getRandomSupportedAggregation() { + final int numberOfSupportedAggs = 4; + switch (randomIntBetween(1, numberOfSupportedAggs)) { + case 1: + return AggregationBuilders.avg(randomAlphaOfLengthBetween(1, 10)); + case 2: + return AggregationBuilders.min(randomAlphaOfLengthBetween(1, 10)); + case 3: + return AggregationBuilders.max(randomAlphaOfLengthBetween(1, 10)); + case 4: + return AggregationBuilders.sum(randomAlphaOfLengthBetween(1, 10)); + } + + return null; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java new file mode 100644 index 0000000000000..6ab5aa52de1ca --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.junit.Before; + +import java.io.IOException; + +public class FeatureIndexBuilderJobConfigTests extends AbstractSerializingFeatureIndexBuilderTestCase { + + private String jobId; + + public static FeatureIndexBuilderJobConfig randomFeatureIndexBuilderJobConfig() { + return new FeatureIndexBuilderJobConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), + randomAlphaOfLengthBetween(1, 10), null, AggregationConfigTests.randonAggregationConfig()); + } + + @Before + public void setUpOptionalId() { + jobId = randomAlphaOfLengthBetween(1, 10); + } + + @Override + protected FeatureIndexBuilderJobConfig doParseInstance(XContentParser parser) throws IOException { + if (randomBoolean()) { + return FeatureIndexBuilderJobConfig.fromXContent(parser, jobId); + } else { + return FeatureIndexBuilderJobConfig.fromXContent(parser, null); + } + } + + @Override + protected FeatureIndexBuilderJobConfig createTestInstance() { + return randomFeatureIndexBuilderJobConfig(); + } + + @Override + protected Reader instanceReader() { + return FeatureIndexBuilderJobConfig::new; + } +} From c17d138225087addc06026158a93a06b02b91855 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 25 Sep 2018 11:13:37 +0200 Subject: [PATCH 2/5] disable AggregationConfig due to upstream bug --- .../featureindexbuilder/job/AggregationConfigTests.java | 9 +++++++-- .../job/FeatureIndexBuilderJobConfigTests.java | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java index 6adf459cb35fe..ba91372f1549a 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.featureindexbuilder.job; +import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.AggregationBuilder; @@ -13,6 +14,10 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import java.io.IOException; +import static org.hamcrest.Matchers.equalTo; + +// broken upstream +@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/33942") public class AggregationConfigTests extends AbstractSerializingFeatureIndexBuilderTestCase { public static AggregationConfig randonAggregationConfig() { @@ -27,8 +32,8 @@ public static AggregationConfig randonAggregationConfig() { @Override protected AggregationConfig doParseInstance(XContentParser parser) throws IOException { - //parser.nextToken(); - + // parseAggregators expects to be already inside the xcontent object + assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT)); return AggregationConfig.fromXContent(parser); } diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java index 6ab5aa52de1ca..83b786335bf9f 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java @@ -17,8 +17,9 @@ public class FeatureIndexBuilderJobConfigTests extends AbstractSerializingFeatur private String jobId; public static FeatureIndexBuilderJobConfig randomFeatureIndexBuilderJobConfig() { + // AggregationConfig disabled, see: https://github.com/elastic/elasticsearch/pull/33942 return new FeatureIndexBuilderJobConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), null, AggregationConfigTests.randonAggregationConfig()); + randomAlphaOfLengthBetween(1, 10), null, null /* AggregationConfigTests.randonAggregationConfig() */); } @Before From b1efc7bf4cc6eea376e4f8612abd9ea33d7245a3 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 25 Sep 2018 14:08:12 +0200 Subject: [PATCH 3/5] add tests for SourceConfig --- .../featureindexbuilder/job/SourceConfig.java | 76 ++++++++++++++++--- .../FeatureIndexBuilderJobConfigTests.java | 3 +- .../job/SourceConfigTests.java | 59 ++++++++++++++ 3 files changed, 127 insertions(+), 11 deletions(-) create mode 100644 x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfigTests.java diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java index 86f223124ff44..0f077d3076c56 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java @@ -9,13 +9,21 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceParserHelper; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; /* * Wrapper for the Source config part of a composite aggregation. @@ -23,33 +31,81 @@ * For now just wraps sources from composite aggs. */ public class SourceConfig implements Writeable, ToXContentObject { - private final CompositeValuesSourceBuilder sourceBuilder; + + private static final String NAME = "feature_index_builder_source"; + + private List> sources; + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, false, (args) -> { + @SuppressWarnings("unchecked") + List> sources = (List>) args[0]; + return new SourceConfig(sources); + }); + + static { + PARSER.declareFieldArray(constructorArg(), (parser, builder) -> CompositeValuesSourceParserHelper.fromXContent(parser), + CompositeAggregationBuilder.SOURCES_FIELD_NAME, ObjectParser.ValueType.OBJECT_ARRAY); + } SourceConfig(final StreamInput in) throws IOException { - sourceBuilder = CompositeValuesSourceParserHelper.readFrom(in); + int num = in.readVInt(); + this.sources = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + CompositeValuesSourceBuilder builder = CompositeValuesSourceParserHelper.readFrom(in); + sources.add(builder); + } } - public SourceConfig(CompositeValuesSourceBuilder sourceBuilder) { - this.sourceBuilder = sourceBuilder; + public SourceConfig(List> sources) { + this.sources = sources; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return CompositeValuesSourceParserHelper.toXContent(sourceBuilder, builder, params); + builder.startObject(); + builder.startArray(CompositeAggregationBuilder.SOURCES_FIELD_NAME.getPreferredName()); + for (CompositeValuesSourceBuilder source : sources) { + CompositeValuesSourceParserHelper.toXContent(source, builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; } @Override public void writeTo(StreamOutput out) throws IOException { - CompositeValuesSourceParserHelper.writeTo(sourceBuilder, out); + out.writeVInt(sources.size()); + for (CompositeValuesSourceBuilder builder : sources) { + CompositeValuesSourceParserHelper.writeTo(builder, out); + } + } + + @Override + public int hashCode() { + return Objects.hash(sources); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final SourceConfig that = (SourceConfig) other; + + return Objects.equals(this.sources, that.sources); } public static SourceConfig fromXContent(final XContentParser parser) throws IOException { - CompositeValuesSourceBuilder builder = CompositeValuesSourceParserHelper.fromXContent(parser); - return new SourceConfig(builder); + return PARSER.parse(parser, null); } + // to be fixed public CompositeValuesSourceBuilder getSourceBuilder() { - return sourceBuilder; + return sources.get(0); } - } diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java index 83b786335bf9f..847aa93e8ee31 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java @@ -19,7 +19,8 @@ public class FeatureIndexBuilderJobConfigTests extends AbstractSerializingFeatur public static FeatureIndexBuilderJobConfig randomFeatureIndexBuilderJobConfig() { // AggregationConfig disabled, see: https://github.com/elastic/elasticsearch/pull/33942 return new FeatureIndexBuilderJobConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), null, null /* AggregationConfigTests.randonAggregationConfig() */); + randomAlphaOfLengthBetween(1, 10), SourceConfigTests.randomSourceConfig(), + null /* AggregationConfigTests.randonAggregationConfig() */); } @Before diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfigTests.java new file mode 100644 index 0000000000000..e94754c825d27 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfigTests.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class SourceConfigTests extends AbstractSerializingFeatureIndexBuilderTestCase { + + public static SourceConfig randomSourceConfig() { + int numSources = randomIntBetween(1, 10); + List> sources = new ArrayList<>(); + for (int i = 0; i < numSources; i++) { + sources.add(randomTermsSourceBuilder()); + } + return new SourceConfig(sources); + } + + @Override + protected SourceConfig doParseInstance(XContentParser parser) throws IOException { + return SourceConfig.fromXContent(parser); + } + + @Override + protected SourceConfig createTestInstance() { + return randomSourceConfig(); + } + + @Override + protected Reader instanceReader() { + return SourceConfig::new; + } + + private static TermsValuesSourceBuilder randomTermsSourceBuilder() { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10)); + if (randomBoolean()) { + terms.field(randomAlphaOfLengthBetween(1, 20)); + } else { + terms.script(new Script(randomAlphaOfLengthBetween(10, 20))); + } + terms.order(randomFrom(SortOrder.values())); + if (randomBoolean()) { + terms.missingBucket(true); + } + return terms; + } +} From 7dc55e86af1103256dc3c08dc0a7557697637c71 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 25 Sep 2018 14:22:12 +0200 Subject: [PATCH 4/5] cleanup SourceConfig --- .../job/AggregationConfig.java | 6 ------ .../job/FeatureIndexBuilderIndexer.java | 8 +++----- .../featureindexbuilder/job/SourceConfig.java | 19 +++++++++---------- 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java index 40cc3e103eea1..673d051911b68 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java @@ -38,12 +38,6 @@ public AggregationConfig(AggregatorFactories.Builder aggregatorFactoryBuilder) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - /*for (AggregationBuilder subAgg : getAggregatorFactories()) { - subAgg.toXContent(builder, params); - } - return builder; - */ - return aggregatorFactoryBuilder.toXContent(builder, params); } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index 3d6eb0faeed6d..015b73c732eaf 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -68,7 +67,8 @@ protected IterationResult> doProcess(SearchResponse searchRe * TODO: replace with proper implementation */ private List processBuckets(CompositeAggregation agg) { - String destinationFieldName = job.getConfig().getSourceConfig().getSourceBuilder().name(); + // for now only 1 source supported + String destinationFieldName = job.getConfig().getSourceConfig().getSources().get(0).name(); String aggName = job.getConfig().getAggregationConfig().getAggregatorFactories().get(0).getName(); return agg.getBuckets().stream().map(b -> { @@ -97,9 +97,7 @@ protected SearchRequest buildSearchRequest() { QueryBuilder queryBuilder = new MatchAllQueryBuilder(); SearchRequest searchRequest = new SearchRequest(job.getConfig().getIndexPattern()); - List> sources = new ArrayList<>(); - - sources.add(job.getConfig().getSourceConfig().getSourceBuilder()); + List> sources = job.getConfig().getSourceConfig().getSources(); CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources); compositeAggregation.size(1000); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java index 0f077d3076c56..4cb8cdf784c4f 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java @@ -34,7 +34,7 @@ public class SourceConfig implements Writeable, ToXContentObject { private static final String NAME = "feature_index_builder_source"; - private List> sources; + private final List> sources; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, false, (args) -> { @SuppressWarnings("unchecked") @@ -52,7 +52,7 @@ public class SourceConfig implements Writeable, ToXContentObject { this.sources = new ArrayList<>(num); for (int i = 0; i < num; i++) { CompositeValuesSourceBuilder builder = CompositeValuesSourceParserHelper.readFrom(in); - sources.add(builder); + getSources().add(builder); } } @@ -64,7 +64,7 @@ public SourceConfig(List> sources) { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.startArray(CompositeAggregationBuilder.SOURCES_FIELD_NAME.getPreferredName()); - for (CompositeValuesSourceBuilder source : sources) { + for (CompositeValuesSourceBuilder source : getSources()) { CompositeValuesSourceParserHelper.toXContent(source, builder, params); } builder.endArray(); @@ -74,15 +74,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(sources.size()); - for (CompositeValuesSourceBuilder builder : sources) { + out.writeVInt(getSources().size()); + for (CompositeValuesSourceBuilder builder : getSources()) { CompositeValuesSourceParserHelper.writeTo(builder, out); } } @Override public int hashCode() { - return Objects.hash(sources); + return Objects.hash(getSources()); } @Override @@ -97,15 +97,14 @@ public boolean equals(Object other) { final SourceConfig that = (SourceConfig) other; - return Objects.equals(this.sources, that.sources); + return Objects.equals(this.getSources(), that.getSources()); } public static SourceConfig fromXContent(final XContentParser parser) throws IOException { return PARSER.parse(parser, null); } - // to be fixed - public CompositeValuesSourceBuilder getSourceBuilder() { - return sources.get(0); + public List> getSources() { + return sources; } } From 446d4454c65cb6ffe90d8820e448f7351c1dd975 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 25 Sep 2018 17:00:10 +0200 Subject: [PATCH 5/5] address review comments --- .../action/TransportStopFeatureIndexBuilderJobAction.java | 5 +++-- .../xpack/ml/featureindexbuilder/job/AggregationConfig.java | 4 ++-- .../xpack/ml/featureindexbuilder/job/SourceConfig.java | 6 ++++-- .../job/AbstractSerializingFeatureIndexBuilderTestCase.java | 4 +--- .../ml/featureindexbuilder/job/AggregationConfigTests.java | 6 +++--- 5 files changed, 13 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java index 74e31e1bb4e29..938e3d71a4f9c 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.featureindexbuilder.action; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -57,9 +58,9 @@ protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndex List failedNodeExceptions) { if (taskOperationFailures.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); + throw ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); } else if (failedNodeExceptions.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); + throw ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); } // Either the job doesn't exist (the user didn't create it yet) or was deleted diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java index 673d051911b68..d124453c07893 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java @@ -33,7 +33,7 @@ public AggregationConfig(AggregatorFactories.Builder aggregatorFactoryBuilder) { } AggregationConfig(final StreamInput in) throws IOException { - aggregatorFactoryBuilder = in.readOptionalWriteable(AggregatorFactories.Builder::new); + aggregatorFactoryBuilder = new AggregatorFactories.Builder(in); } @Override @@ -43,7 +43,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(aggregatorFactoryBuilder); + aggregatorFactoryBuilder.writeTo(out); } public List getAggregatorFactories() { diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java index 4cb8cdf784c4f..42e5634fc4bc4 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -49,15 +50,16 @@ public class SourceConfig implements Writeable, ToXContentObject { SourceConfig(final StreamInput in) throws IOException { int num = in.readVInt(); - this.sources = new ArrayList<>(num); + List> sources = new ArrayList<>(num); for (int i = 0; i < num; i++) { CompositeValuesSourceBuilder builder = CompositeValuesSourceParserHelper.readFrom(in); getSources().add(builder); } + this.sources = Collections.unmodifiableList(sources); } public SourceConfig(List> sources) { - this.sources = sources; + this.sources = Collections.unmodifiableList(sources); } @Override diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java index ed2c419ed8421..c2ed9e3d4e281 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java @@ -28,9 +28,7 @@ public abstract class AbstractSerializingFeatureIndexBuilderTestCase { - public static AggregationConfig randonAggregationConfig() { + public static AggregationConfig randomAggregationConfig() { AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); - for (int i = 1; i < randomIntBetween(1, 20); ++i) { + for (int i = 0; i < randomIntBetween(1, 20); ++i) { builder.addAggregator(getRandomSupportedAggregation()); } @@ -39,7 +39,7 @@ protected AggregationConfig doParseInstance(XContentParser parser) throws IOExce @Override protected AggregationConfig createTestInstance() { - return randonAggregationConfig(); + return randomAggregationConfig(); } @Override