Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.DeleteFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsStatsAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.PutFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.StopFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportDeleteFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportGetDataFrameJobsAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportGetDataFrameJobsStatsAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportPutFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStartFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStopFeatureIndexBuilderJobAction;
Expand All @@ -51,6 +53,7 @@
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobState;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestDeleteFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestGetDataFrameJobsAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestGetDataFrameJobsStatsAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestPutFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStartFeatureIndexBuilderJobAction;
import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStopFeatureIndexBuilderJobAction;
Expand Down Expand Up @@ -118,7 +121,8 @@ public List<RestHandler> getRestHandlers(final Settings settings, final RestCont
new RestStartFeatureIndexBuilderJobAction(settings, restController),
new RestStopFeatureIndexBuilderJobAction(settings, restController),
new RestDeleteFeatureIndexBuilderJobAction(settings, restController),
new RestGetDataFrameJobsAction(settings, restController)
new RestGetDataFrameJobsAction(settings, restController),
new RestGetDataFrameJobsStatsAction(settings, restController)
);
}

Expand All @@ -133,7 +137,8 @@ public List<RestHandler> getRestHandlers(final Settings settings, final RestCont
new ActionHandler<>(StartFeatureIndexBuilderJobAction.INSTANCE, TransportStartFeatureIndexBuilderJobAction.class),
new ActionHandler<>(StopFeatureIndexBuilderJobAction.INSTANCE, TransportStopFeatureIndexBuilderJobAction.class),
new ActionHandler<>(DeleteFeatureIndexBuilderJobAction.INSTANCE, TransportDeleteFeatureIndexBuilderJobAction.class),
new ActionHandler<>(GetDataFrameJobsAction.INSTANCE, TransportGetDataFrameJobsAction.class)
new ActionHandler<>(GetDataFrameJobsAction.INSTANCE, TransportGetDataFrameJobsAction.class),
new ActionHandler<>(GetDataFrameJobsStatsAction.INSTANCE, TransportGetDataFrameJobsStatsAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.featureindexbuilder.action;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobState;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.DataFrameIndexerJobStats;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob;

import java.io.IOException;
import java.util.Objects;

public class DataFrameJobStateAndStats implements Writeable, ToXContentObject {

public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField STATS_FIELD = new ParseField("stats");

private final String id;
private final FeatureIndexBuilderJobState jobState;
private final DataFrameIndexerJobStats jobStats;

public static final ConstructingObjectParser<DataFrameJobStateAndStats, Void> PARSER = new ConstructingObjectParser<>(
GetDataFrameJobsAction.NAME,
a -> new DataFrameJobStateAndStats((String) a[0], (FeatureIndexBuilderJobState) a[1], (DataFrameIndexerJobStats) a[2]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), FeatureIndexBuilderJob.ID);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), FeatureIndexBuilderJobState.PARSER::apply, STATE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> DataFrameIndexerJobStats.fromXContent(p), STATS_FIELD);
}

public DataFrameJobStateAndStats(String id, FeatureIndexBuilderJobState state, DataFrameIndexerJobStats stats) {
this.id = Objects.requireNonNull(id);
this.jobState = Objects.requireNonNull(state);
this.jobStats = Objects.requireNonNull(stats);
}

public DataFrameJobStateAndStats(StreamInput in) throws IOException {
this.id = in.readString();
this.jobState = new FeatureIndexBuilderJobState(in);
this.jobStats = new DataFrameIndexerJobStats(in);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FeatureIndexBuilderJob.ID.getPreferredName(), id);
builder.field(STATE_FIELD.getPreferredName(), jobState);
builder.field(STATS_FIELD.getPreferredName(), jobStats);
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
jobState.writeTo(out);
jobStats.writeTo(out);
}

@Override
public int hashCode() {
return Objects.hash(id, jobState, jobStats);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

DataFrameJobStateAndStats that = (DataFrameJobStateAndStats) other;

return Objects.equals(this.id, that.id) && Objects.equals(this.jobState, that.jobState)
&& Objects.equals(this.jobStats, that.jobStats);
}

public String getId() {
return id;
}

public DataFrameIndexerJobStats getJobStats() {
return jobStats;
}

public FeatureIndexBuilderJobState getJobState() {
return jobState;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ml.featureindexbuilder.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class GetDataFrameJobsStatsAction extends Action<GetDataFrameJobsStatsAction.Response> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anomaly detection, the equivalent action is called GetJobStatsAction where job is singular. We might want to be consistent but I don't feel strongly about it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this with @droberts195 on a previous PR and we went all for plural, all the endpoint have changed to .../jobs/{id}/... GetDataFrameJob changed to GetDataFrameJobs, etc.

Anyway, it's a good point and I suggest to keep it as is for now but have a session about naming where we can go over all endpoints and then do a renaming PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ML actions are inconsistent in this respect: we have GetJobsStatsAction and GetDatafeedsStatsAction but GetJobStatsActionRequest and RestGetJobStatsAction. I think plural is correct.


public static final GetDataFrameJobsStatsAction INSTANCE = new GetDataFrameJobsStatsAction();
public static final String NAME = "cluster:monitor/data_frame_stats/get";
public static final ParseField COUNT = new ParseField("count");
public static final ParseField JOBS = new ParseField("jobs");

public GetDataFrameJobsStatsAction() {
super(NAME);
}

@Override
public Response newResponse() {
return new Response();
}

public static class Request extends BaseTasksRequest<Request> implements ToXContent {
private String id;

public Request(String id) {
if (Strings.isNullOrEmpty(id) || id.equals("*")) {
this.id = MetaData.ALL;
} else {
this.id = id;
}
}

public Request() {}

@Override
public boolean match(Task task) {
// If we are retrieving all the jobs, the task description does not contain the id
if (id.equals(MetaData.ALL)) {
return task.getDescription().startsWith(FeatureIndexBuilderJob.PERSISTENT_TASK_DESCRIPTION_PREFIX);
}
// Otherwise find the task by ID
return task.getDescription().equals(FeatureIndexBuilderJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
}

public String getId() {
return id;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(FeatureIndexBuilderJob.ID.getPreferredName(), id);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(id);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
}
}

public static class RequestBuilder extends ActionRequestBuilder<Request, Response> {

protected RequestBuilder(ElasticsearchClient client, GetDataFrameJobsStatsAction action) {
super(client, action, new Request());
}
}

public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private List<DataFrameJobStateAndStats> jobsStateAndStats;

public Response(List<DataFrameJobStateAndStats> jobsStateAndStats) {
super(Collections.emptyList(), Collections.emptyList());
this.jobsStateAndStats = jobsStateAndStats;
}

public Response(List<DataFrameJobStateAndStats> jobsStateAndStats, List<TaskOperationFailure> taskFailures,
List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.jobsStateAndStats = jobsStateAndStats;
}

public Response() {
super(Collections.emptyList(), Collections.emptyList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also init jobsStateAndStats to empty list?

this.jobsStateAndStats = Collections.emptyList();
}

public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}

public List<DataFrameJobStateAndStats> getJobsStateAndStats() {
return jobsStateAndStats;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobsStateAndStats = in.readList(DataFrameJobStateAndStats::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(jobsStateAndStats);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(COUNT.getPreferredName(), jobsStateAndStats.size());
builder.field(JOBS.getPreferredName(), jobsStateAndStats);
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(jobsStateAndStats);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

final Response that = (Response) other;
return Objects.equals(this.jobsStateAndStats, that.jobsStateAndStats);
}

@Override
public final String toString() {
return Strings.toString(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsAction.Request;
import org.elasticsearch.xpack.ml.featureindexbuilder.action.GetDataFrameJobsAction.Response;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobConfig;
import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask;
import org.elasticsearch.xpack.ml.featureindexbuilder.persistence.DataFramePersistentTaskUtils;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -80,7 +79,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
final DiscoveryNodes nodes = state.nodes();

if (nodes.isLocalNodeElectedMaster()) {
if (stateHasDataFrameJobs(request, state)) {
if (DataFramePersistentTaskUtils.stateHasDataFrameJobs(request.getId(), state)) {
super.doExecute(task, request, listener);
} else {
// If we couldn't find the job in the persistent task CS, it means it was deleted prior to this GET
Expand All @@ -100,28 +99,4 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
}
}
}

/**
* Check to see if the PersistentTask's cluster state contains the job(s) we are interested in
*/
static boolean stateHasDataFrameJobs(Request request, ClusterState state) {
boolean hasJobs = false;
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

if (pTasksMeta != null) {
// If the request was for _all jobs, we need to look through the list of
// persistent tasks and see if at least once has a DataFrameJob param
if (request.getId().equals(MetaData.ALL)) {
hasJobs = pTasksMeta.tasks()
.stream()
.anyMatch(persistentTask -> persistentTask.getTaskName().equals(FeatureIndexBuilder.TASK_NAME));

} else if (pTasksMeta.getTask(request.getId()) != null) {
// If we're looking for a single job, we can just check directly
hasJobs = true;
}
}
return hasJobs;
}

}
Loading