diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java index 37252bf82a410..7e28f68a23eda 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportPutFeatureIndexBuilderJobAction.java @@ -37,9 +37,6 @@ public class TransportPutFeatureIndexBuilderJobAction extends TransportMasterNodeAction { - // TODO: hack, to be replaced - private static final String PIVOT_INDEX = "pivot-reviews"; - private final XPackLicenseState licenseState; private final PersistentTasksService persistentTasksService; private final Client client; @@ -76,7 +73,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); FeatureIndexBuilderJob job = createFeatureIndexBuilderJob(request.getConfig(), threadPool); - createIndex(client, job.getConfig().getId()); + createIndex(client, job.getConfig().getDestinationIndex()); startPersistentTask(job, listener, persistentTasksService); } @@ -105,9 +102,8 @@ protected ClusterBlockException checkBlock(PutFeatureIndexBuilderJobAction.Reque * * TODO: everything below will be replaced with proper implementation read from job configuration */ - private static void createIndex(Client client, String suffix) { + private static void createIndex(Client client, String indexName) { - String indexName = PIVOT_INDEX + "_" + suffix; CreateIndexRequest request = new CreateIndexRequest(indexName); request.settings(Settings.builder() // <1> diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java index 68a4fee37fa77..938e3d71a4f9c 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.featureindexbuilder.action; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -23,26 +24,26 @@ import java.io.IOException; import java.util.List; -public class TransportStopFeatureIndexBuilderJobAction extends TransportTasksAction { - +public class TransportStopFeatureIndexBuilderJobAction extends + TransportTasksAction { @Inject - public TransportStopFeatureIndexBuilderJobAction(Settings settings, TransportService transportService, - ActionFilters actionFilters, ClusterService clusterService) { + public TransportStopFeatureIndexBuilderJobAction(Settings settings, TransportService transportService, ActionFilters actionFilters, + ClusterService clusterService) { super(settings, StopFeatureIndexBuilderJobAction.NAME, clusterService, transportService, actionFilters, StopFeatureIndexBuilderJobAction.Request::new, StopFeatureIndexBuilderJobAction.Response::new, ThreadPool.Names.SAME); } @Override - protected void doExecute(Task task, StopFeatureIndexBuilderJobAction.Request request, ActionListener listener) { + protected void doExecute(Task task, StopFeatureIndexBuilderJobAction.Request request, + ActionListener listener) { super.doExecute(task, request, listener); } @Override - protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, - FeatureIndexBuilderJobTask jobTask, - ActionListener listener) { + protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, FeatureIndexBuilderJobTask jobTask, + ActionListener listener) { if (jobTask.getConfig().getId().equals(request.getId())) { jobTask.stop(listener); } else { @@ -52,19 +53,18 @@ protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, } @Override - protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndexBuilderJobAction.Request request, List tasks, - List taskOperationFailures, - List failedNodeExceptions) { + protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndexBuilderJobAction.Request request, + List tasks, List taskOperationFailures, + List failedNodeExceptions) { if (taskOperationFailures.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper - .convertToElastic(taskOperationFailures.get(0).getCause()); + throw ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause()); } else if (failedNodeExceptions.isEmpty() == false) { - throw org.elasticsearch.ExceptionsHelper - .convertToElastic(failedNodeExceptions.get(0)); + throw ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0)); } - // Either the job doesn't exist (the user didn't create it yet) or was deleted after the Stop API executed. + // Either the job doesn't exist (the user didn't create it yet) or was deleted + // after the Stop API executed. // In either case, let the user know if (tasks.size() == 0) { throw new ResourceNotFoundException("Task for Feature Index Builder Job [" + request.getId() + "] not found"); @@ -80,5 +80,4 @@ protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndex protected StopFeatureIndexBuilderJobAction.Response readTaskResponse(StreamInput in) throws IOException { return new StopFeatureIndexBuilderJobAction.Response(in); } - } \ No newline at end of file diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java new file mode 100644 index 0000000000000..d124453c07893 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfig.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/* + * Wrapper for the aggregations config part of a composite aggregation. + * + * For now just wraps aggregations from composite aggs. + * + */ +public class AggregationConfig implements Writeable, ToXContentObject { + + private final AggregatorFactories.Builder aggregatorFactoryBuilder; + + public AggregationConfig(AggregatorFactories.Builder aggregatorFactoryBuilder) { + this.aggregatorFactoryBuilder = aggregatorFactoryBuilder; + } + + AggregationConfig(final StreamInput in) throws IOException { + aggregatorFactoryBuilder = new AggregatorFactories.Builder(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return aggregatorFactoryBuilder.toXContent(builder, params); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + aggregatorFactoryBuilder.writeTo(out); + } + + public List getAggregatorFactories() { + return aggregatorFactoryBuilder.getAggregatorFactories(); + } + + public static AggregationConfig fromXContent(final XContentParser parser) throws IOException { + return new AggregationConfig(AggregatorFactories.parseAggregators(parser)); + } + + @Override + public int hashCode() { + return Objects.hash(aggregatorFactoryBuilder); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final AggregationConfig that = (AggregationConfig) other; + + return Objects.equals(this.aggregatorFactoryBuilder, that.aggregatorFactoryBuilder); + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index 998a295f1297e..015b73c732eaf 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -13,12 +13,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; -import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; -import org.elasticsearch.search.aggregations.metrics.InternalAvg; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -26,7 +25,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -37,8 +35,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, FeatureIndexBuilderJobStats> { - private static final String PIVOT_INDEX = "pivot-reviews"; - private static final String SOURCE_INDEX = "anonreviews"; private static final Logger logger = Logger.getLogger(FeatureIndexBuilderIndexer.class.getName()); private FeatureIndexBuilderJob job; @@ -71,21 +67,24 @@ protected IterationResult> doProcess(SearchResponse searchRe * TODO: replace with proper implementation */ private List processBuckets(CompositeAggregation agg) { + // for now only 1 source supported + String destinationFieldName = job.getConfig().getSourceConfig().getSources().get(0).name(); + String aggName = job.getConfig().getAggregationConfig().getAggregatorFactories().get(0).getName(); + return agg.getBuckets().stream().map(b -> { - InternalAvg avgAgg = b.getAggregations().get("avg_rating"); + NumericMetricsAggregation.SingleValue aggResult = b.getAggregations().get(aggName); XContentBuilder builder; try { builder = jsonBuilder(); - builder.startObject(); - builder.field("reviewerId", b.getKey().get("reviewerId")); - builder.field("avg_rating", avgAgg.getValue()); + builder.field(destinationFieldName, b.getKey().get(destinationFieldName)); + builder.field(aggName, aggResult.value()); builder.endObject(); } catch (IOException e) { throw new UncheckedIOException(e); } - String indexName = PIVOT_INDEX + "_" + job.getConfig().getId(); + String indexName = job.getConfig().getDestinationIndex(); IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder); return request; }).collect(Collectors.toList()); @@ -93,33 +92,24 @@ private List processBuckets(CompositeAggregation agg) { @Override protected SearchRequest buildSearchRequest() { - final Map position = getPosition(); - SearchRequest request = buildFeatureQuery(position); - return request; - } - /* - * Mocked demo case - * - * TODO: everything below will be replaced with proper implementation read from job configuration - */ - private static SearchRequest buildFeatureQuery(Map after) { QueryBuilder queryBuilder = new MatchAllQueryBuilder(); - SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX); + SearchRequest searchRequest = new SearchRequest(job.getConfig().getIndexPattern()); - List> sources = new ArrayList<>(); - sources.add(new TermsValuesSourceBuilder("reviewerId").field("reviewerId")); + List> sources = job.getConfig().getSourceConfig().getSources(); CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources); compositeAggregation.size(1000); - if (after != null) { - compositeAggregation.aggregateAfter(after); + if (position != null) { + compositeAggregation.aggregateAfter(position); + } + + for (AggregationBuilder agg : job.getConfig().getAggregationConfig().getAggregatorFactories()) { + compositeAggregation.subAggregation(agg); } - compositeAggregation.subAggregation(AggregationBuilders.avg("avg_rating").field("rating")); - compositeAggregation.subAggregation(AggregationBuilders.cardinality("dc_vendors").field("vendorId")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.aggregation(compositeAggregation); sourceBuilder.size(0); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java index 645a97ab8d928..e3a1035b24fc5 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfig.java @@ -16,9 +16,11 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; /** @@ -28,25 +30,55 @@ public class FeatureIndexBuilderJobConfig implements NamedWriteable, ToXContentO private static final String NAME = "xpack/feature_index_builder/jobconfig"; private static final ParseField ID = new ParseField("id"); + private static final ParseField INDEX_PATTERN = new ParseField("index_pattern"); + private static final ParseField DESTINATION_INDEX = new ParseField("destination_index"); + private static final ParseField SOURCES = new ParseField("sources"); + private static final ParseField AGGREGATIONS = new ParseField("aggregations"); private final String id; + private final String indexPattern; + private final String destinationIndex; + private final SourceConfig sourceConfig; + private final AggregationConfig aggregationConfig; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, false, (args, optionalId) -> { String id = args[0] != null ? (String) args[0] : optionalId; - return new FeatureIndexBuilderJobConfig(id); + String indexPattern = (String) args[1]; + String destinationIndex = (String) args[2]; + SourceConfig sourceConfig= (SourceConfig) args[3]; + AggregationConfig aggregationConfig = (AggregationConfig) args[4]; + return new FeatureIndexBuilderJobConfig(id, indexPattern, destinationIndex, sourceConfig, aggregationConfig); }); static { PARSER.declareString(optionalConstructorArg(), ID); + PARSER.declareString(constructorArg(), INDEX_PATTERN); + PARSER.declareString(constructorArg(), DESTINATION_INDEX); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> SourceConfig.fromXContent(p), SOURCES); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS); } - public FeatureIndexBuilderJobConfig(final String id) { - this.id = id; + public FeatureIndexBuilderJobConfig(final String id, + final String indexPattern, + final String destinationIndex, + final SourceConfig sourceConfig, + final AggregationConfig aggregationConfig) { + this.id = ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); + this.indexPattern = ExceptionsHelper.requireNonNull(indexPattern, INDEX_PATTERN.getPreferredName()); + this.destinationIndex = ExceptionsHelper.requireNonNull(destinationIndex, DESTINATION_INDEX.getPreferredName()); + + // TODO: check for null? + this.sourceConfig = sourceConfig; + this.aggregationConfig = aggregationConfig; } public FeatureIndexBuilderJobConfig(final StreamInput in) throws IOException { id = in.readString(); + indexPattern = in.readString(); + destinationIndex = in.readString(); + sourceConfig = in.readOptionalWriteable(SourceConfig::new); + aggregationConfig = in.readOptionalWriteable(AggregationConfig::new); } public String getId() { @@ -57,13 +89,41 @@ public String getCron() { return "*"; } + public String getIndexPattern() { + return indexPattern; + } + + public String getDestinationIndex() { + return destinationIndex; + } + + public SourceConfig getSourceConfig() { + return sourceConfig; + } + + public AggregationConfig getAggregationConfig() { + return aggregationConfig; + } + public void writeTo(final StreamOutput out) throws IOException { out.writeString(id); + out.writeString(indexPattern); + out.writeString(destinationIndex); + out.writeOptionalWriteable(sourceConfig); + out.writeOptionalWriteable(aggregationConfig); } public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); builder.field(ID.getPreferredName(), id); + builder.field(INDEX_PATTERN.getPreferredName(), indexPattern); + builder.field(DESTINATION_INDEX.getPreferredName(), destinationIndex); + if (sourceConfig != null) { + builder.field(SOURCES.getPreferredName(), sourceConfig); + } + if (aggregationConfig!=null) { + builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig); + } builder.endObject(); return builder; } @@ -85,12 +145,16 @@ public boolean equals(Object other) { final FeatureIndexBuilderJobConfig that = (FeatureIndexBuilderJobConfig) other; - return Objects.equals(this.id, that.id); + return Objects.equals(this.id, that.id) + && Objects.equals(this.indexPattern, that.indexPattern) + && Objects.equals(this.destinationIndex, that.destinationIndex) + && Objects.equals(this.sourceConfig, that.sourceConfig) + && Objects.equals(this.aggregationConfig, that.aggregationConfig); } @Override public int hashCode() { - return Objects.hash(id); + return Objects.hash(id, indexPattern, destinationIndex, sourceConfig, aggregationConfig); } @Override diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java index 580a3fe81ce30..49e387d6d98dc 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java @@ -66,7 +66,6 @@ public class FeatureIndexBuilderJobState implements Task.Status, PersistentTaskS public FeatureIndexBuilderJobState(IndexerState state, @Nullable Map position) { this.state = state; this.currentPosition = Collections.unmodifiableSortedMap(position == null ? null : new TreeMap<>(position)); - } public FeatureIndexBuilderJobState(StreamInput in) throws IOException { diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java new file mode 100644 index 0000000000000..42e5634fc4bc4 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfig.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceParserHelper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +/* + * Wrapper for the Source config part of a composite aggregation. + * + * For now just wraps sources from composite aggs. + */ +public class SourceConfig implements Writeable, ToXContentObject { + + private static final String NAME = "feature_index_builder_source"; + + private final List> sources; + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, false, (args) -> { + @SuppressWarnings("unchecked") + List> sources = (List>) args[0]; + return new SourceConfig(sources); + }); + + static { + PARSER.declareFieldArray(constructorArg(), (parser, builder) -> CompositeValuesSourceParserHelper.fromXContent(parser), + CompositeAggregationBuilder.SOURCES_FIELD_NAME, ObjectParser.ValueType.OBJECT_ARRAY); + } + + SourceConfig(final StreamInput in) throws IOException { + int num = in.readVInt(); + List> sources = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + CompositeValuesSourceBuilder builder = CompositeValuesSourceParserHelper.readFrom(in); + getSources().add(builder); + } + this.sources = Collections.unmodifiableList(sources); + } + + public SourceConfig(List> sources) { + this.sources = Collections.unmodifiableList(sources); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray(CompositeAggregationBuilder.SOURCES_FIELD_NAME.getPreferredName()); + for (CompositeValuesSourceBuilder source : getSources()) { + CompositeValuesSourceParserHelper.toXContent(source, builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(getSources().size()); + for (CompositeValuesSourceBuilder builder : getSources()) { + CompositeValuesSourceParserHelper.writeTo(builder, out); + } + } + + @Override + public int hashCode() { + return Objects.hash(getSources()); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final SourceConfig that = (SourceConfig) other; + + return Objects.equals(this.getSources(), that.getSources()); + } + + public static SourceConfig fromXContent(final XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public List> getSources() { + return sources; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java index 14bedcbe909b1..3efed4c21e801 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/PutFeatureIndexBuilderJobActionRequestTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.test.AbstractStreamableXContentTestCase; import org.elasticsearch.xpack.ml.featureindexbuilder.action.PutFeatureIndexBuilderJobAction.Request; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfigTests; import org.junit.Before; import java.io.IOException; @@ -40,7 +41,7 @@ protected boolean supportsUnknownFields() { @Override protected Request createTestInstance() { - FeatureIndexBuilderJobConfig config = new FeatureIndexBuilderJobConfig(randomAlphaOfLengthBetween(1,10)); + FeatureIndexBuilderJobConfig config = FeatureIndexBuilderJobConfigTests.randomFeatureIndexBuilderJobConfig(); return new Request(config); } diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java new file mode 100644 index 0000000000000..c2ed9e3d4e281 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AbstractSerializingFeatureIndexBuilderTestCase.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.junit.Before; + +import static java.util.Collections.emptyList; + +public abstract class AbstractSerializingFeatureIndexBuilderTestCase + extends AbstractSerializingTestCase { + + /** + * Test case that ensure aggregation named objects are registered + */ + + private NamedWriteableRegistry namedWriteableRegistry; + private NamedXContentRegistry namedXContentRegistry; + + @Before + public void registerAggregationNamedObjects() throws Exception { + // register aggregations as NamedWriteable + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); + namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents()); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return namedWriteableRegistry; + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return namedXContentRegistry; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java new file mode 100644 index 0000000000000..a46d598d8897b --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationConfigTests.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.apache.lucene.util.LuceneTestCase.AwaitsFix; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +// broken upstream +@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/33942") +public class AggregationConfigTests extends AbstractSerializingFeatureIndexBuilderTestCase { + + public static AggregationConfig randomAggregationConfig() { + AggregatorFactories.Builder builder = new AggregatorFactories.Builder(); + + for (int i = 0; i < randomIntBetween(1, 20); ++i) { + builder.addAggregator(getRandomSupportedAggregation()); + } + + return new AggregationConfig(builder); + } + + @Override + protected AggregationConfig doParseInstance(XContentParser parser) throws IOException { + // parseAggregators expects to be already inside the xcontent object + assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT)); + return AggregationConfig.fromXContent(parser); + } + + @Override + protected AggregationConfig createTestInstance() { + return randomAggregationConfig(); + } + + @Override + protected Reader instanceReader() { + return AggregationConfig::new; + } + + private static AggregationBuilder getRandomSupportedAggregation() { + final int numberOfSupportedAggs = 4; + switch (randomIntBetween(1, numberOfSupportedAggs)) { + case 1: + return AggregationBuilders.avg(randomAlphaOfLengthBetween(1, 10)); + case 2: + return AggregationBuilders.min(randomAlphaOfLengthBetween(1, 10)); + case 3: + return AggregationBuilders.max(randomAlphaOfLengthBetween(1, 10)); + case 4: + return AggregationBuilders.sum(randomAlphaOfLengthBetween(1, 10)); + } + + return null; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java new file mode 100644 index 0000000000000..847aa93e8ee31 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobConfigTests.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.junit.Before; + +import java.io.IOException; + +public class FeatureIndexBuilderJobConfigTests extends AbstractSerializingFeatureIndexBuilderTestCase { + + private String jobId; + + public static FeatureIndexBuilderJobConfig randomFeatureIndexBuilderJobConfig() { + // AggregationConfig disabled, see: https://github.com/elastic/elasticsearch/pull/33942 + return new FeatureIndexBuilderJobConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), + randomAlphaOfLengthBetween(1, 10), SourceConfigTests.randomSourceConfig(), + null /* AggregationConfigTests.randonAggregationConfig() */); + } + + @Before + public void setUpOptionalId() { + jobId = randomAlphaOfLengthBetween(1, 10); + } + + @Override + protected FeatureIndexBuilderJobConfig doParseInstance(XContentParser parser) throws IOException { + if (randomBoolean()) { + return FeatureIndexBuilderJobConfig.fromXContent(parser, jobId); + } else { + return FeatureIndexBuilderJobConfig.fromXContent(parser, null); + } + } + + @Override + protected FeatureIndexBuilderJobConfig createTestInstance() { + return randomFeatureIndexBuilderJobConfig(); + } + + @Override + protected Reader instanceReader() { + return FeatureIndexBuilderJobConfig::new; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfigTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfigTests.java new file mode 100644 index 0000000000000..e94754c825d27 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/SourceConfigTests.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class SourceConfigTests extends AbstractSerializingFeatureIndexBuilderTestCase { + + public static SourceConfig randomSourceConfig() { + int numSources = randomIntBetween(1, 10); + List> sources = new ArrayList<>(); + for (int i = 0; i < numSources; i++) { + sources.add(randomTermsSourceBuilder()); + } + return new SourceConfig(sources); + } + + @Override + protected SourceConfig doParseInstance(XContentParser parser) throws IOException { + return SourceConfig.fromXContent(parser); + } + + @Override + protected SourceConfig createTestInstance() { + return randomSourceConfig(); + } + + @Override + protected Reader instanceReader() { + return SourceConfig::new; + } + + private static TermsValuesSourceBuilder randomTermsSourceBuilder() { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10)); + if (randomBoolean()) { + terms.field(randomAlphaOfLengthBetween(1, 20)); + } else { + terms.script(new Script(randomAlphaOfLengthBetween(10, 20))); + } + terms.order(randomFrom(SortOrder.values())); + if (randomBoolean()) { + terms.missingBucket(true); + } + return terms; + } +}