From c32c4c8a0a05cc15a748287da197b6ced21dfcbb Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Nov 2018 16:16:39 +0100 Subject: [PATCH 1/6] add a stats endpoint --- .../FeatureIndexBuilder.java | 9 +- .../action/DataFrameJobStateAndStats.java | 103 +++++++++ .../action/GetDataFrameJobsStatsAction.java | 203 ++++++++++++++++++ .../TransportGetDataFrameJobsAction.java | 29 +-- .../TransportGetDataFrameJobsStatsAction.java | 104 +++++++++ .../job/AggregationResultUtils.java | 5 +- ...ats.java => DataFrameIndexerJobStats.java} | 14 +- .../job/FeatureIndexBuilderIndexer.java | 6 +- .../job/FeatureIndexBuilderJobTask.java | 4 + .../DataFramePersistentTaskUtils.java | 43 ++++ .../RestGetDataFrameJobsStatsAction.java | 37 ++++ .../DataFrameJobStateAndStatsTests.java | 37 ++++ ...tDataFrameJobsStatsActionRequestTests.java | 26 +++ .../job/AggregationResultUtilsTests.java | 5 +- .../job/DataFrameIndexerJobStatsTests.java | 33 +++ 15 files changed, 616 insertions(+), 42 deletions(-) create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java rename x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/{FeatureIndexBuilderJobStats.java => DataFrameIndexerJobStats.java} (77%) create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/persistence/DataFramePersistentTaskUtils.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestGetDataFrameJobsStatsAction.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStatsTests.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsActionRequestTests.java create mode 100644 x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java index a49d9be8e605c..eb6cc2ecb56be 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java @@ -38,11 +38,13 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.ml.featureindexbuilder.action.DeleteFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsStatsAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.PutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StopFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportDeleteFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportGetDataFrameJobsAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportGetDataFrameJobsStatsAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStopFeatureIndexBuilderJobAction; @@ -51,6 +53,7 @@ import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobState; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestDeleteFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestGetDataFrameJobsAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestGetDataFrameJobsStatsAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStopFeatureIndexBuilderJobAction; @@ -118,7 +121,8 @@ public List getRestHandlers(final Settings settings, final RestCont new RestStartFeatureIndexBuilderJobAction(settings, restController), new RestStopFeatureIndexBuilderJobAction(settings, restController), new RestDeleteFeatureIndexBuilderJobAction(settings, restController), - new RestGetDataFrameJobsAction(settings, restController) + new RestGetDataFrameJobsAction(settings, restController), + new RestGetDataFrameJobsStatsAction(settings, restController) ); } @@ -133,7 +137,8 @@ public List getRestHandlers(final Settings settings, final RestCont new ActionHandler<>(StartFeatureIndexBuilderJobAction.INSTANCE, TransportStartFeatureIndexBuilderJobAction.class), new ActionHandler<>(StopFeatureIndexBuilderJobAction.INSTANCE, TransportStopFeatureIndexBuilderJobAction.class), new ActionHandler<>(DeleteFeatureIndexBuilderJobAction.INSTANCE, TransportDeleteFeatureIndexBuilderJobAction.class), - new ActionHandler<>(GetDataFrameJobsAction.INSTANCE, TransportGetDataFrameJobsAction.class) + new ActionHandler<>(GetDataFrameJobsAction.INSTANCE, TransportGetDataFrameJobsAction.class), + new ActionHandler<>(GetDataFrameJobsStatsAction.INSTANCE, TransportGetDataFrameJobsStatsAction.class) ); } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java new file mode 100644 index 0000000000000..d84a948d5eecb --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobState; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.DataFrameIndexerJobStats; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + +import java.io.IOException; +import java.util.Objects; + +public class DataFrameJobStateAndStats implements Writeable, ToXContentObject { + + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField STATS_FIELD = new ParseField("stats"); + + private final String id; + private final FeatureIndexBuilderJobState jobState; + private final DataFrameIndexerJobStats jobStats; + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + GetDataFrameJobsAction.NAME, + a -> new DataFrameJobStateAndStats((String) a[0], (FeatureIndexBuilderJobState) a[1], (DataFrameIndexerJobStats) a[2])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), FeatureIndexBuilderJob.ID); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), FeatureIndexBuilderJobState.PARSER::apply, STATE_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> DataFrameIndexerJobStats.fromXContent(p), STATS_FIELD); + } + + public DataFrameJobStateAndStats(String id, FeatureIndexBuilderJobState state, DataFrameIndexerJobStats stats) { + this.id = id; + this.jobState = state; + this.jobStats = stats; + } + + public DataFrameJobStateAndStats(StreamInput in) throws IOException { + this.id = in.readString(); + this.jobState = new FeatureIndexBuilderJobState(in); + this.jobStats = new DataFrameIndexerJobStats(in); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(FeatureIndexBuilderJob.ID.getPreferredName(), id); + builder.field(STATE_FIELD.getPreferredName(), jobState); + builder.field(STATS_FIELD.getPreferredName(), jobStats); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + jobState.writeTo(out); + jobStats.writeTo(out); + } + + @Override + public int hashCode() { + return Objects.hash(id, jobState, jobStats); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DataFrameJobStateAndStats that = (DataFrameJobStateAndStats) other; + + return Objects.equals(this.id, that.id) && Objects.equals(this.jobState, that.jobState) + && Objects.equals(this.jobStats, that.jobStats); + } + + public String getId() { + return id; + } + + public DataFrameIndexerJobStats getJobStats() { + return jobStats; + } + + public FeatureIndexBuilderJobState getJobState() { + return jobState; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java new file mode 100644 index 0000000000000..d5ec9e30915c9 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java @@ -0,0 +1,203 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class GetDataFrameJobsStatsAction extends Action { + + public static final GetDataFrameJobsStatsAction INSTANCE = new GetDataFrameJobsStatsAction(); + public static final String NAME = "cluster:monitor/data_frame_stats/get"; + public static final ParseField COUNT = new ParseField("count"); + public static final ParseField JOBS = new ParseField("jobs"); + + public GetDataFrameJobsStatsAction() { + super(NAME); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends BaseTasksRequest implements ToXContent { + private String id; + + public Request(String id) { + if (Strings.isNullOrEmpty(id) || id.equals("*")) { + this.id = MetaData.ALL; + } else { + this.id = id; + } + } + + public Request() {} + + @Override + public boolean match(Task task) { + // If we are retrieving all the jobs, the task description does not contain the id + if (id.equals(MetaData.ALL)) { + return task.getDescription().startsWith(FeatureIndexBuilderJob.PERSISTENT_TASK_DESCRIPTION_PREFIX); + } + // Otherwise find the task by ID + return task.getDescription().equals(FeatureIndexBuilderJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + id); + } + + public String getId() { + return id; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + id = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(FeatureIndexBuilderJob.ID.getPreferredName(), id); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(id, other.id); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, GetDataFrameJobsStatsAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + private List jobResponses; + + public Response(List jobResponses) { + super(Collections.emptyList(), Collections.emptyList()); + this.jobResponses = jobResponses; + } + + public Response(List jobResponses, List taskFailures, + List nodeFailures) { + super(taskFailures, nodeFailures); + this.jobResponses = jobResponses; + } + + public Response() { + super(Collections.emptyList(), Collections.emptyList()); + } + + public Response(StreamInput in) throws IOException { + super(Collections.emptyList(), Collections.emptyList()); + readFrom(in); + } + + public List getJobResponses() { + return jobResponses; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + jobResponses = in.readList(DataFrameJobStateAndStats::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(jobResponses); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(COUNT.getPreferredName(), jobResponses.size()); + // XContentBuilder does not support passing the params object for Iterables + builder.field(JOBS.getPreferredName()); + builder.startArray(); + for (DataFrameJobStateAndStats jobResponse : jobResponses) { + jobResponse.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(jobResponses); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final Response that = (Response) other; + return Objects.equals(this.jobResponses, that.jobResponses); + } + + @Override + public final String toString() { + return Strings.toString(this); + } + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsAction.java index 941375dc31331..46b44c70c5fb8 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsAction.java @@ -19,15 +19,14 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.discovery.MasterNotDiscoveredException; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsAction.Request; import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsAction.Response; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask; +import org.elasticsearch.xpack.ml.featureindexbuilder.persistence.DataFramePersistentTaskUtils; import java.io.IOException; import java.util.Collection; @@ -80,7 +79,7 @@ protected void doExecute(Task task, Request request, ActionListener li final DiscoveryNodes nodes = state.nodes(); if (nodes.isLocalNodeElectedMaster()) { - if (stateHasDataFrameJobs(request, state)) { + if (DataFramePersistentTaskUtils.stateHasDataFrameJobs(request.getId(), state)) { super.doExecute(task, request, listener); } else { // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this GET @@ -100,28 +99,4 @@ protected void doExecute(Task task, Request request, ActionListener li } } } - - /** - * Check to see if the PersistentTask's cluster state contains the job(s) we are interested in - */ - static boolean stateHasDataFrameJobs(Request request, ClusterState state) { - boolean hasJobs = false; - PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - - if (pTasksMeta != null) { - // If the request was for _all jobs, we need to look through the list of - // persistent tasks and see if at least once has a DataFrameJob param - if (request.getId().equals(MetaData.ALL)) { - hasJobs = pTasksMeta.tasks() - .stream() - .anyMatch(persistentTask -> persistentTask.getTaskName().equals(FeatureIndexBuilder.TASK_NAME)); - - } else if (pTasksMeta.getTask(request.getId()) != null) { - // If we're looking for a single job, we can just check directly - hasJobs = true; - } - } - return hasJobs; - } - } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java new file mode 100644 index 0000000000000..adaa2e76a4fea --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsStatsAction.Request; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsStatsAction.Response; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask; +import org.elasticsearch.xpack.ml.featureindexbuilder.persistence.DataFramePersistentTaskUtils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class TransportGetDataFrameJobsStatsAction extends + TransportTasksAction { + + @Inject + public TransportGetDataFrameJobsStatsAction(TransportService transportService, ActionFilters actionFilters, + ClusterService clusterService) { + super(GetDataFrameJobsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new, + ThreadPool.Names.SAME); + } + + @Override + protected Response newResponse(Request request, List tasks, List taskOperationFailures, + List failedNodeExceptions) { + List responses = tasks.stream().map(GetDataFrameJobsStatsAction.Response::getJobResponses) + .flatMap(Collection::stream).collect(Collectors.toList()); + return new Response(responses, taskOperationFailures, failedNodeExceptions); + } + + @Override + protected Response readTaskResponse(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void taskOperation(Request request, FeatureIndexBuilderJobTask task, ActionListener listener) { + List jobsStateAndStats = Collections.emptyList(); + + assert task.getConfig().getId().equals(request.getId()) || request.getId().equals(MetaData.ALL); + + // Little extra insurance, make sure we only return jobs that aren't cancelled + if (task.isCancelled() == false) { + DataFrameJobStateAndStats jobStateAndStats = new DataFrameJobStateAndStats(task.getConfig().getId(), task.getState(), + task.getStats()); + jobsStateAndStats = Collections.singletonList(jobStateAndStats); + } + + listener.onResponse(new Response(jobsStateAndStats)); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + + if (nodes.isLocalNodeElectedMaster()) { + if (DataFramePersistentTaskUtils.stateHasDataFrameJobs(request.getId(), state)) { + super.doExecute(task, request, listener); + } else { + // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this GET + // and we can just send an empty response, no need to go looking for the allocated task + listener.onResponse(new Response(Collections.emptyList())); + } + + } else { + // Delegates GetJobs to elected master node, so it becomes the coordinating node. + // Non-master nodes may have a stale cluster state that shows jobs which are cancelled + // on the master, which makes testing difficult. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, Response::new)); + } + } + } +} \ No newline at end of file diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java index ddd4c12d54798..6a9304010bf71 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java @@ -30,11 +30,14 @@ final class AggregationResultUtils { * @param agg The aggregation result * @param sources The original sources used for querying * @param aggregationBuilders the aggregation used for querying + * @param dataFrameIndexerJobStats stats collector * @return a map containing the results of the aggregation in a consumable way */ public static Stream> extractCompositeAggregationResults(CompositeAggregation agg, - List> sources, Collection aggregationBuilders) { + List> sources, Collection aggregationBuilders, DataFrameIndexerJobStats dataFrameIndexerJobStats) { return agg.getBuckets().stream().map(bucket -> { + dataFrameIndexerJobStats.incrementNumDocuments(bucket.getDocCount()); + Map document = new HashMap<>(); for (CompositeValuesSourceBuilder source : sources) { String destinationFieldName = source.name(); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStats.java similarity index 77% rename from x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java rename to x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStats.java index a7c9392800f09..41377e88d7e3e 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobStats.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStats.java @@ -17,15 +17,15 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -public class FeatureIndexBuilderJobStats extends IndexerJobStats { +public class DataFrameIndexerJobStats extends IndexerJobStats { private static ParseField NUM_PAGES = new ParseField("pages_processed"); private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed"); private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed"); private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count"); - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME.getPreferredName(), - args -> new FeatureIndexBuilderJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); + args -> new DataFrameIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); static { PARSER.declareLong(constructorArg(), NUM_PAGES); @@ -34,15 +34,15 @@ public class FeatureIndexBuilderJobStats extends IndexerJobStats { PARSER.declareLong(constructorArg(), NUM_INVOCATIONS); } - public FeatureIndexBuilderJobStats() { + public DataFrameIndexerJobStats() { super(); } - public FeatureIndexBuilderJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) { + public DataFrameIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) { super(numPages, numInputDocuments, numOuputDocuments, numInvocations); } - public FeatureIndexBuilderJobStats(StreamInput in) throws IOException { + public DataFrameIndexerJobStats(StreamInput in) throws IOException { super(in); } @@ -57,7 +57,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public static FeatureIndexBuilderJobStats fromXContent(XContentParser parser) { + public static DataFrameIndexerJobStats fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); } catch (IOException e) { diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index c1d6e987a4691..e3d3de8c432d0 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -36,7 +36,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.ml.featureindexbuilder.persistence.DataframeIndex.DOC_TYPE; -public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, FeatureIndexBuilderJobStats> { +public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer, DataFrameIndexerJobStats> { private static final String COMPOSITE_AGGREGATION_NAME = "_data_frame"; private static final Logger logger = LogManager.getLogger(FeatureIndexBuilderIndexer.class); @@ -44,7 +44,7 @@ public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer initialState, Map initialPosition) { - super(executor, initialState, initialPosition, new FeatureIndexBuilderJobStats()); + super(executor, initialState, initialPosition, new DataFrameIndexerJobStats()); this.job = job; } @@ -78,7 +78,7 @@ private Stream processBucketsToIndexRequests(CompositeAggregation List> sources = job.getConfig().getSourceConfig().getSources(); Collection aggregationBuilders = job.getConfig().getAggregationConfig().getAggregatorFactories(); - return AggregationResultUtils.extractCompositeAggregationResults(agg, sources, aggregationBuilders).map(document -> { + return AggregationResultUtils.extractCompositeAggregationResults(agg, sources, aggregationBuilders, getStats()).map(document -> { XContentBuilder builder; try { builder = jsonBuilder(); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java index b2e8e813a83d0..659cec9fbb8ae 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java @@ -61,6 +61,10 @@ public FeatureIndexBuilderJobState getState() { return new FeatureIndexBuilderJobState(indexer.getState(), indexer.getPosition()); } + public DataFrameIndexerJobStats getStats() { + return indexer.getStats(); + } + public synchronized void start(ActionListener listener) { // TODO: safeguards missing, see rollup code indexer.start(); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/persistence/DataFramePersistentTaskUtils.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/persistence/DataFramePersistentTaskUtils.java new file mode 100644 index 0000000000000..715ec40b8e2f1 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/persistence/DataFramePersistentTaskUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.persistence; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; + +public final class DataFramePersistentTaskUtils { + + private DataFramePersistentTaskUtils() { + } + + /** + * Check to see if the PersistentTask's cluster state contains the job(s) we + * are interested in + */ + public static boolean stateHasDataFrameJobs(String id, ClusterState state) { + boolean hasJobs = false; + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + + if (pTasksMeta != null) { + // If the request was for _all jobs, we need to look through the + // list of + // persistent tasks and see if at least once has a DataFrameJob + // param + if (id.equals(MetaData.ALL)) { + hasJobs = pTasksMeta.tasks().stream() + .anyMatch(persistentTask -> persistentTask.getTaskName().equals(FeatureIndexBuilder.TASK_NAME)); + + } else if (pTasksMeta.getTask(id) != null) { + // If we're looking for a single job, we can just check directly + hasJobs = true; + } + } + return hasJobs; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestGetDataFrameJobsStatsAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestGetDataFrameJobsStatsAction.java new file mode 100644 index 0000000000000..ca01ca43caf2a --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestGetDataFrameJobsStatsAction.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.rest.action; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsStatsAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + +public class RestGetDataFrameJobsStatsAction extends BaseRestHandler { + + public RestGetDataFrameJobsStatsAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.GET, FeatureIndexBuilder.BASE_PATH_JOBS_BY_ID + "_stats", this); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { + String id = restRequest.param(FeatureIndexBuilderJob.ID.getPreferredName()); + GetDataFrameJobsStatsAction.Request request = new GetDataFrameJobsStatsAction.Request(id); + return channel -> client.execute(GetDataFrameJobsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + + @Override + public String getName() { + return "data_frame_get_jobs_stats_action"; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStatsTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStatsTests.java new file mode 100644 index 0000000000000..d23d23f97c91a --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStatsTests.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.AbstractSerializingFeatureIndexBuilderTestCase; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.DataFrameIndexerJobStatsTests; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobStateTests; + +import java.io.IOException; + +public class DataFrameJobStateAndStatsTests + extends AbstractSerializingFeatureIndexBuilderTestCase { + + @Override + protected DataFrameJobStateAndStats doParseInstance(XContentParser parser) throws IOException { + return DataFrameJobStateAndStats.PARSER.apply(parser, null); + } + + @Override + protected DataFrameJobStateAndStats createTestInstance() { + return new DataFrameJobStateAndStats(randomAlphaOfLengthBetween(1,10), + FeatureIndexBuilderJobStateTests.randomFeatureIndexBuilderJobState(), + DataFrameIndexerJobStatsTests.randomStats()); + } + + @Override + protected Reader instanceReader() { + return DataFrameJobStateAndStats::new; + } + +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsActionRequestTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsActionRequestTests.java new file mode 100644 index 0000000000000..2d311cd9f6c13 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsActionRequestTests.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsStatsAction.Request; + +public class GetDataFrameJobsStatsActionRequestTests extends AbstractStreamableTestCase { + @Override + protected Request createTestInstance() { + if (randomBoolean()) { + return new Request(MetaData.ALL); + } + return new Request(randomAlphaOfLengthBetween(1, 20)); + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java index 9e3ef63fd7e8e..82561d7d8f536 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java @@ -271,13 +271,14 @@ aggTypedName2, asMap( private void executeTest(List> sources, Collection aggregationBuilders, Map input, List> expected) throws IOException { + DataFrameIndexerJobStats stats = new DataFrameIndexerJobStats(); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); try (XContentParser parser = createParser(builder)) { CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature"); - List> result = AggregationResultUtils.extractCompositeAggregationResults(agg, sources, aggregationBuilders) - .collect(Collectors.toList()); + List> result = AggregationResultUtils + .extractCompositeAggregationResults(agg, sources, aggregationBuilders, stats).collect(Collectors.toList()); assertEquals(expected, result); } diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java new file mode 100644 index 0000000000000..150d4d9b8f6d4 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +public class DataFrameIndexerJobStatsTests extends AbstractSerializingTestCase{ + @Override + protected DataFrameIndexerJobStats createTestInstance() { + return randomStats(); + } + + @Override + protected Writeable.Reader instanceReader() { + return DataFrameIndexerJobStats::new; + } + + @Override + protected DataFrameIndexerJobStats doParseInstance(XContentParser parser) { + return DataFrameIndexerJobStats.fromXContent(parser); + } + + public static DataFrameIndexerJobStats randomStats() { + return new DataFrameIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()); + } +} From a45302fa7918d532ec069e1b15c3033bd573e0c8 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Nov 2018 17:12:28 +0100 Subject: [PATCH 2/6] fix checkstyle --- .../ml/featureindexbuilder/job/AggregationResultUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java index 6a9304010bf71..63b06cb369ba1 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java @@ -34,7 +34,8 @@ final class AggregationResultUtils { * @return a map containing the results of the aggregation in a consumable way */ public static Stream> extractCompositeAggregationResults(CompositeAggregation agg, - List> sources, Collection aggregationBuilders, DataFrameIndexerJobStats dataFrameIndexerJobStats) { + List> sources, Collection aggregationBuilders, + DataFrameIndexerJobStats dataFrameIndexerJobStats) { return agg.getBuckets().stream().map(bucket -> { dataFrameIndexerJobStats.incrementNumDocuments(bucket.getDocCount()); From 8c46677a724cec2d12e89af2ba9fedada2ac1971 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Nov 2018 18:18:18 +0100 Subject: [PATCH 3/6] renamings --- .../action/GetDataFrameJobsStatsAction.java | 26 +++++++++---------- .../TransportGetDataFrameJobsStatsAction.java | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java index d5ec9e30915c9..f084b7849eba1 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java @@ -123,17 +123,17 @@ protected RequestBuilder(ElasticsearchClient client, GetDataFrameJobsStatsAction } public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - private List jobResponses; + private List jobsStateAndStats; - public Response(List jobResponses) { + public Response(List jobsStateAndStats) { super(Collections.emptyList(), Collections.emptyList()); - this.jobResponses = jobResponses; + this.jobsStateAndStats = jobsStateAndStats; } - public Response(List jobResponses, List taskFailures, + public Response(List jobsStateAndStats, List taskFailures, List nodeFailures) { super(taskFailures, nodeFailures); - this.jobResponses = jobResponses; + this.jobsStateAndStats = jobsStateAndStats; } public Response() { @@ -145,30 +145,30 @@ public Response(StreamInput in) throws IOException { readFrom(in); } - public List getJobResponses() { - return jobResponses; + public List getJobsStateAndStats() { + return jobsStateAndStats; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - jobResponses = in.readList(DataFrameJobStateAndStats::new); + jobsStateAndStats = in.readList(DataFrameJobStateAndStats::new); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeList(jobResponses); + out.writeList(jobsStateAndStats); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(COUNT.getPreferredName(), jobResponses.size()); + builder.field(COUNT.getPreferredName(), jobsStateAndStats.size()); // XContentBuilder does not support passing the params object for Iterables builder.field(JOBS.getPreferredName()); builder.startArray(); - for (DataFrameJobStateAndStats jobResponse : jobResponses) { + for (DataFrameJobStateAndStats jobResponse : jobsStateAndStats) { jobResponse.toXContent(builder, params); } builder.endArray(); @@ -178,7 +178,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public int hashCode() { - return Objects.hash(jobResponses); + return Objects.hash(jobsStateAndStats); } @Override @@ -192,7 +192,7 @@ public boolean equals(Object other) { } final Response that = (Response) other; - return Objects.equals(this.jobResponses, that.jobResponses); + return Objects.equals(this.jobsStateAndStats, that.jobsStateAndStats); } @Override diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java index adaa2e76a4fea..efb90a601a596 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportGetDataFrameJobsStatsAction.java @@ -49,7 +49,7 @@ public TransportGetDataFrameJobsStatsAction(TransportService transportService, A @Override protected Response newResponse(Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { - List responses = tasks.stream().map(GetDataFrameJobsStatsAction.Response::getJobResponses) + List responses = tasks.stream().map(GetDataFrameJobsStatsAction.Response::getJobsStateAndStats) .flatMap(Collection::stream).collect(Collectors.toList()); return new Response(responses, taskOperationFailures, failedNodeExceptions); } From 092171fd5be1346751efd76166d33a353ae24daa Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 26 Nov 2018 20:03:49 +0100 Subject: [PATCH 4/6] test doc counts to stats test --- .../job/AggregationResultUtilsTests.java | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java index 82561d7d8f536..4a62c56623ad3 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java @@ -61,6 +61,7 @@ public class AggregationResultUtilsTests extends ESTestCase { private final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(namedXContents); private final String KEY = Aggregation.CommonFields.KEY.getPreferredName(); + private final String DOC_COUNT = Aggregation.CommonFields.DOC_COUNT.getPreferredName(); // aggregations potentially useful for writing tests, to be expanded as necessary private static final List namedXContents; @@ -107,17 +108,20 @@ public void testExtractCompositeAggregationResults() throws IOException { KEY, asMap( targetField, "ID1"), aggTypedName, asMap( - "value", 42.33)), + "value", 42.33), + DOC_COUNT, 8), asMap( KEY, asMap( targetField, "ID2"), aggTypedName, asMap( - "value", 28.99)), + "value", 28.99), + DOC_COUNT, 3), asMap( KEY, asMap( targetField, "ID3"), aggTypedName, asMap( - "value", 12.55)) + "value", 12.55), + DOC_COUNT, 9) )); List> expected = asList( @@ -135,7 +139,7 @@ aggTypedName, asMap( ) ); - executeTest(sources, aggregationBuilders, input, expected); + executeTest(sources, aggregationBuilders, input, expected, 20); } public void testExtractCompositeAggregationResultsMultiSources() throws IOException { @@ -160,28 +164,32 @@ KEY, asMap( targetField2, "ID1_2" ), aggTypedName, asMap( - "value", 42.33)), + "value", 42.33), + DOC_COUNT, 1), asMap( KEY, asMap( targetField, "ID1", targetField2, "ID2_2" ), aggTypedName, asMap( - "value", 8.4)), + "value", 8.4), + DOC_COUNT, 2), asMap( KEY, asMap( targetField, "ID2", targetField2, "ID1_2" ), aggTypedName, asMap( - "value", 28.99)), + "value", 28.99), + DOC_COUNT, 3), asMap( KEY, asMap( targetField, "ID3", targetField2, "ID2_2" ), aggTypedName, asMap( - "value", 12.55)) + "value", 12.55), + DOC_COUNT, 4) )); List> expected = asList( @@ -206,7 +214,7 @@ aggTypedName, asMap( aggName, 12.55 ) ); - executeTest(sources, aggregationBuilders, input, expected); + executeTest(sources, aggregationBuilders, input, expected, 10); } public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException { @@ -232,21 +240,24 @@ KEY, asMap( aggTypedName, asMap( "value", 42.33), aggTypedName2, asMap( - "value", 9.9)), + "value", 9.9), + DOC_COUNT, 111), asMap( KEY, asMap( targetField, "ID2"), aggTypedName, asMap( "value", 28.99), aggTypedName2, asMap( - "value", 222.33)), + "value", 222.33), + DOC_COUNT, 88), asMap( KEY, asMap( targetField, "ID3"), aggTypedName, asMap( "value", 12.55), aggTypedName2, asMap( - "value", -2.44)) + "value", -2.44), + DOC_COUNT, 1) )); List> expected = asList( @@ -266,11 +277,11 @@ aggTypedName2, asMap( aggName2, -2.44 ) ); - executeTest(sources, aggregationBuilders, input, expected); + executeTest(sources, aggregationBuilders, input, expected, 200); } private void executeTest(List> sources, Collection aggregationBuilders, - Map input, List> expected) throws IOException { + Map input, List> expected, long expectedDocCounts) throws IOException { DataFrameIndexerJobStats stats = new DataFrameIndexerJobStats(); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); @@ -281,6 +292,7 @@ private void executeTest(List> sources, Collecti .extractCompositeAggregationResults(agg, sources, aggregationBuilders, stats).collect(Collectors.toList()); assertEquals(expected, result); + assertEquals(expectedDocCounts, stats.getNumDocuments()); } } From 9c7a8ace99d3ffc44448f1da687fdbdec1bf09be Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 28 Nov 2018 11:21:42 +0100 Subject: [PATCH 5/6] address review comments --- .../action/DataFrameJobStateAndStats.java | 6 +++--- .../action/GetDataFrameJobsStatsAction.java | 9 ++------- .../persistence/DataFramePersistentTaskUtils.java | 6 ++---- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java index d84a948d5eecb..2a44aace47fd5 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DataFrameJobStateAndStats.java @@ -40,9 +40,9 @@ public class DataFrameJobStateAndStats implements Writeable, ToXContentObject { } public DataFrameJobStateAndStats(String id, FeatureIndexBuilderJobState state, DataFrameIndexerJobStats stats) { - this.id = id; - this.jobState = state; - this.jobStats = stats; + this.id = Objects.requireNonNull(id); + this.jobState = Objects.requireNonNull(state); + this.jobStats = Objects.requireNonNull(stats); } public DataFrameJobStateAndStats(StreamInput in) throws IOException { diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java index f084b7849eba1..be2b70084676e 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/GetDataFrameJobsStatsAction.java @@ -138,6 +138,7 @@ public Response(List jobsStateAndStats, List persistentTask.getTaskName().equals(FeatureIndexBuilder.TASK_NAME)); From c9137dd9a60207ae6af4c3dbd3d74ac6d1ed7e49 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 28 Nov 2018 11:48:32 +0100 Subject: [PATCH 6/6] add extra field for stats --- .../job/DataFrameIndexerJobStats.java | 27 ++++++++++++++++--- .../job/DataFrameIndexerJobStatsTests.java | 5 ++-- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStats.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStats.java index 41377e88d7e3e..7cf5f430db25d 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStats.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStats.java @@ -22,24 +22,39 @@ public class DataFrameIndexerJobStats extends IndexerJobStats { private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed"); private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed"); private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count"); + private static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms"); + private static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms"); + private static ParseField INDEX_TOTAL = new ParseField("index_total"); + private static ParseField SEARCH_TOTAL = new ParseField("search_total"); + private static ParseField SEARCH_FAILURES = new ParseField("search_failures"); + private static ParseField INDEX_FAILURES = new ParseField("index_failures"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME.getPreferredName(), - args -> new DataFrameIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); + args -> new DataFrameIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], + (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9])); static { PARSER.declareLong(constructorArg(), NUM_PAGES); PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS); PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS); PARSER.declareLong(constructorArg(), NUM_INVOCATIONS); + PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS); + PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS); + PARSER.declareLong(constructorArg(), INDEX_TOTAL); + PARSER.declareLong(constructorArg(), SEARCH_TOTAL); + PARSER.declareLong(constructorArg(), INDEX_FAILURES); + PARSER.declareLong(constructorArg(), SEARCH_FAILURES); } public DataFrameIndexerJobStats() { super(); } - public DataFrameIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) { - super(numPages, numInputDocuments, numOuputDocuments, numInvocations); + public DataFrameIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations, long indexTime, + long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) { + super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal, indexFailures, + searchFailures); } public DataFrameIndexerJobStats(StreamInput in) throws IOException { @@ -53,6 +68,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments); builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments); builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations); + builder.field(INDEX_TIME_IN_MS.getPreferredName(), indexTime); + builder.field(INDEX_TOTAL.getPreferredName(), indexTotal); + builder.field(INDEX_FAILURES.getPreferredName(), indexFailures); + builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime); + builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal); + builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures); builder.endObject(); return builder; } diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java index 150d4d9b8f6d4..a71368d3ef031 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/DataFrameIndexerJobStatsTests.java @@ -27,7 +27,8 @@ protected DataFrameIndexerJobStats doParseInstance(XContentParser parser) { } public static DataFrameIndexerJobStats randomStats() { - return new DataFrameIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong()); + return new DataFrameIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()); } }