Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
public class TransportPutFeatureIndexBuilderJobAction
extends TransportMasterNodeAction<PutFeatureIndexBuilderJobAction.Request, PutFeatureIndexBuilderJobAction.Response> {

// TODO: hack, to be replaced
private static final String PIVOT_INDEX = "pivot-reviews";

private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
Expand Down Expand Up @@ -76,7 +73,7 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);

FeatureIndexBuilderJob job = createFeatureIndexBuilderJob(request.getConfig(), threadPool);
createIndex(client, job.getConfig().getId());
createIndex(client, job.getConfig().getDestinationIndex());
startPersistentTask(job, listener, persistentTasksService);
}

Expand Down Expand Up @@ -105,9 +102,8 @@ protected ClusterBlockException checkBlock(PutFeatureIndexBuilderJobAction.Reque
*
* TODO: everything below will be replaced with proper implementation read from job configuration
*/
private static void createIndex(Client client, String suffix) {
private static void createIndex(Client client, String indexName) {

String indexName = PIVOT_INDEX + "_" + suffix;
CreateIndexRequest request = new CreateIndexRequest(indexName);

request.settings(Settings.builder() // <1>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.featureindexbuilder.action;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
Expand All @@ -23,26 +24,26 @@
import java.io.IOException;
import java.util.List;

public class TransportStopFeatureIndexBuilderJobAction extends TransportTasksAction<FeatureIndexBuilderJobTask, StopFeatureIndexBuilderJobAction.Request,
StopFeatureIndexBuilderJobAction.Response, StopFeatureIndexBuilderJobAction.Response> {

public class TransportStopFeatureIndexBuilderJobAction extends
TransportTasksAction<FeatureIndexBuilderJobTask, StopFeatureIndexBuilderJobAction.Request,
StopFeatureIndexBuilderJobAction.Response, StopFeatureIndexBuilderJobAction.Response> {

@Inject
public TransportStopFeatureIndexBuilderJobAction(Settings settings, TransportService transportService,
ActionFilters actionFilters, ClusterService clusterService) {
public TransportStopFeatureIndexBuilderJobAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService) {
super(settings, StopFeatureIndexBuilderJobAction.NAME, clusterService, transportService, actionFilters,
StopFeatureIndexBuilderJobAction.Request::new, StopFeatureIndexBuilderJobAction.Response::new, ThreadPool.Names.SAME);
}

@Override
protected void doExecute(Task task, StopFeatureIndexBuilderJobAction.Request request, ActionListener<StopFeatureIndexBuilderJobAction.Response> listener) {
protected void doExecute(Task task, StopFeatureIndexBuilderJobAction.Request request,
ActionListener<StopFeatureIndexBuilderJobAction.Response> listener) {
super.doExecute(task, request, listener);
}

@Override
protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request,
FeatureIndexBuilderJobTask jobTask,
ActionListener<StopFeatureIndexBuilderJobAction.Response> listener) {
protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, FeatureIndexBuilderJobTask jobTask,
ActionListener<StopFeatureIndexBuilderJobAction.Response> listener) {
if (jobTask.getConfig().getId().equals(request.getId())) {
jobTask.stop(listener);
} else {
Expand All @@ -52,19 +53,18 @@ protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request,
}

@Override
protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndexBuilderJobAction.Request request, List<StopFeatureIndexBuilderJobAction.Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndexBuilderJobAction.Request request,
List<StopFeatureIndexBuilderJobAction.Response> tasks, List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {

if (taskOperationFailures.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(taskOperationFailures.get(0).getCause());
throw ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
} else if (failedNodeExceptions.isEmpty() == false) {
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(failedNodeExceptions.get(0));
throw ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
}

// Either the job doesn't exist (the user didn't create it yet) or was deleted after the Stop API executed.
// Either the job doesn't exist (the user didn't create it yet) or was deleted
// after the Stop API executed.
// In either case, let the user know
if (tasks.size() == 0) {
throw new ResourceNotFoundException("Task for Feature Index Builder Job [" + request.getId() + "] not found");
Expand All @@ -80,5 +80,4 @@ protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndex
protected StopFeatureIndexBuilderJobAction.Response readTaskResponse(StreamInput in) throws IOException {
return new StopFeatureIndexBuilderJobAction.Response(in);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.featureindexbuilder.job;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import java.io.IOException;
import java.util.List;
import java.util.Objects;

/*
* Wrapper for the aggregations config part of a composite aggregation.
*
* For now just wraps aggregations from composite aggs.
*
*/
public class AggregationConfig implements Writeable, ToXContentObject {

private final AggregatorFactories.Builder aggregatorFactoryBuilder;

public AggregationConfig(AggregatorFactories.Builder aggregatorFactoryBuilder) {
this.aggregatorFactoryBuilder = aggregatorFactoryBuilder;
}

AggregationConfig(final StreamInput in) throws IOException {
aggregatorFactoryBuilder = new AggregatorFactories.Builder(in);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return aggregatorFactoryBuilder.toXContent(builder, params);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
aggregatorFactoryBuilder.writeTo(out);
}

public List<AggregationBuilder> getAggregatorFactories() {
return aggregatorFactoryBuilder.getAggregatorFactories();
}

public static AggregationConfig fromXContent(final XContentParser parser) throws IOException {
return new AggregationConfig(AggregatorFactories.parseAggregators(parser));
}

@Override
public int hashCode() {
return Objects.hash(aggregatorFactoryBuilder);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

final AggregationConfig that = (AggregationConfig) other;

return Objects.equals(this.aggregatorFactoryBuilder, that.aggregatorFactoryBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand All @@ -37,8 +35,6 @@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, FeatureIndexBuilderJobStats> {
private static final String PIVOT_INDEX = "pivot-reviews";
private static final String SOURCE_INDEX = "anonreviews";

private static final Logger logger = Logger.getLogger(FeatureIndexBuilderIndexer.class.getName());
private FeatureIndexBuilderJob job;
Expand Down Expand Up @@ -71,55 +67,49 @@ protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchRe
* TODO: replace with proper implementation
*/
private List<IndexRequest> processBuckets(CompositeAggregation agg) {
// for now only 1 source supported
String destinationFieldName = job.getConfig().getSourceConfig().getSources().get(0).name();
String aggName = job.getConfig().getAggregationConfig().getAggregatorFactories().get(0).getName();

return agg.getBuckets().stream().map(b -> {
InternalAvg avgAgg = b.getAggregations().get("avg_rating");
NumericMetricsAggregation.SingleValue aggResult = b.getAggregations().get(aggName);
XContentBuilder builder;
try {
builder = jsonBuilder();

builder.startObject();
builder.field("reviewerId", b.getKey().get("reviewerId"));
builder.field("avg_rating", avgAgg.getValue());
builder.field(destinationFieldName, b.getKey().get(destinationFieldName));
builder.field(aggName, aggResult.value());
builder.endObject();
} catch (IOException e) {
throw new UncheckedIOException(e);
}

String indexName = PIVOT_INDEX + "_" + job.getConfig().getId();
String indexName = job.getConfig().getDestinationIndex();
IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder);
return request;
}).collect(Collectors.toList());
}

@Override
protected SearchRequest buildSearchRequest() {

final Map<String, Object> position = getPosition();
SearchRequest request = buildFeatureQuery(position);
return request;
}

/*
* Mocked demo case
*
* TODO: everything below will be replaced with proper implementation read from job configuration
*/
private static SearchRequest buildFeatureQuery(Map<String, Object> after) {
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
SearchRequest searchRequest = new SearchRequest(SOURCE_INDEX);
SearchRequest searchRequest = new SearchRequest(job.getConfig().getIndexPattern());

List<CompositeValuesSourceBuilder<?>> sources = new ArrayList<>();
sources.add(new TermsValuesSourceBuilder("reviewerId").field("reviewerId"));
List<CompositeValuesSourceBuilder<?>> sources = job.getConfig().getSourceConfig().getSources();

CompositeAggregationBuilder compositeAggregation = new CompositeAggregationBuilder("feature", sources);
compositeAggregation.size(1000);

if (after != null) {
compositeAggregation.aggregateAfter(after);
if (position != null) {
compositeAggregation.aggregateAfter(position);
}

for (AggregationBuilder agg : job.getConfig().getAggregationConfig().getAggregatorFactories()) {
compositeAggregation.subAggregation(agg);
}

compositeAggregation.subAggregation(AggregationBuilders.avg("avg_rating").field("rating"));
compositeAggregation.subAggregation(AggregationBuilders.cardinality("dc_vendors").field("vendorId"));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.aggregation(compositeAggregation);
sourceBuilder.size(0);
Expand Down
Loading