diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java index 01537d493c3e4..e1e384b57415f 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/FeatureIndexBuilder.java @@ -19,26 +19,36 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.DeleteFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.PutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.StopFeatureIndexBuilderJobAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportDeleteFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStartFeatureIndexBuilderJobAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.TransportStopFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobPersistentTasksExecutor; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobState; +import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestDeleteFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestPutFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStartFeatureIndexBuilderJobAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.rest.action.RestStopFeatureIndexBuilderJobAction; import java.time.Clock; import java.util.ArrayList; @@ -64,22 +74,30 @@ public class FeatureIndexBuilder extends Plugin implements ActionPlugin, Persist private final boolean enabled; private final Settings settings; + private final boolean transportClientMode; public FeatureIndexBuilder(Settings settings) { this.settings = settings; // todo: XPackSettings.FEATURE_INDEX_BUILDER_ENABLED.get(settings); this.enabled = true; + this.transportClientMode = XPackPlugin.transportClientMode(settings); } @Override public Collection createGuiceModules() { List modules = new ArrayList<>(); + if (transportClientMode) { + return modules; + } + modules.add(b -> XPackPlugin.bindFeatureSet(b, FeatureIndexBuilderFeatureSet.class)); return modules; } + protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } + @Override public List getRestHandlers(final Settings settings, final RestController restController, final ClusterSettings clusterSettings, final IndexScopedSettings indexScopedSettings, final SettingsFilter settingsFilter, @@ -91,7 +109,9 @@ public List getRestHandlers(final Settings settings, final RestCont return Arrays.asList( new RestPutFeatureIndexBuilderJobAction(settings, restController), - new RestStartFeatureIndexBuilderJobAction(settings, restController) + new RestStartFeatureIndexBuilderJobAction(settings, restController), + new RestStopFeatureIndexBuilderJobAction(settings, restController), + new RestDeleteFeatureIndexBuilderJobAction(settings, restController) ); } @@ -103,13 +123,15 @@ public List getRestHandlers(final Settings settings, final RestCont return Arrays.asList( new ActionHandler<>(PutFeatureIndexBuilderJobAction.INSTANCE, TransportPutFeatureIndexBuilderJobAction.class), - new ActionHandler<>(StartFeatureIndexBuilderJobAction.INSTANCE, TransportStartFeatureIndexBuilderJobAction.class) + new ActionHandler<>(StartFeatureIndexBuilderJobAction.INSTANCE, TransportStartFeatureIndexBuilderJobAction.class), + new ActionHandler<>(StopFeatureIndexBuilderJobAction.INSTANCE, TransportStopFeatureIndexBuilderJobAction.class), + new ActionHandler<>(DeleteFeatureIndexBuilderJobAction.INSTANCE, TransportDeleteFeatureIndexBuilderJobAction.class) ); } @Override public List> getExecutorBuilders(Settings settings) { - if (false == enabled) { + if (false == enabled || transportClientMode) { return emptyList(); } @@ -122,7 +144,7 @@ public List> getExecutorBuilders(Settings settings) { @Override public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client) { - if (enabled == false) { + if (enabled == false || transportClientMode) { return emptyList(); } @@ -130,14 +152,19 @@ public List> getPersistentTasksExecutor(ClusterServic return Collections.singletonList(new FeatureIndexBuilderJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool)); } + @Override public List getNamedXContent() { if (enabled == false) { return emptyList(); } - return Collections.singletonList( + return Arrays.asList( new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField("xpack/feature_index_builder/job"), - FeatureIndexBuilderJob::fromXContent) + FeatureIndexBuilderJob::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(FeatureIndexBuilderJobState.NAME), + FeatureIndexBuilderJobState::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(FeatureIndexBuilderJobState.NAME), + FeatureIndexBuilderJobState::fromXContent) ); } } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DeleteFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DeleteFeatureIndexBuilderJobAction.java new file mode 100644 index 0000000000000..bdb4ef32a5d2c --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DeleteFeatureIndexBuilderJobAction.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + +import java.io.IOException; +import java.util.Objects; + +public class DeleteFeatureIndexBuilderJobAction extends Action { + + public static final DeleteFeatureIndexBuilderJobAction INSTANCE = new DeleteFeatureIndexBuilderJobAction(); + public static final String NAME = "cluster:admin/xpack/feature_index_builder/delete"; + + private DeleteFeatureIndexBuilderJobAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest implements ToXContent { + private String id; + + public Request(String id) { + this.id = ExceptionsHelper.requireNonNull(id, FeatureIndexBuilderJob.ID.getPreferredName()); + } + + public Request() { + } + + public String getId() { + return id; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + id = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(FeatureIndexBuilderJob.ID.getPreferredName(), id); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(id, other.id); + } + } + + public static class RequestBuilder extends MasterNodeOperationRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, DeleteFeatureIndexBuilderJobAction action) { + super(client, action, new Request()); + } + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StartFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StartFeatureIndexBuilderJobAction.java index 61312538fff99..6fcfdf4e8aa71 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StartFeatureIndexBuilderJobAction.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StartFeatureIndexBuilderJobAction.java @@ -19,7 +19,8 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.core.rollup.RollupField; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + import java.io.IOException; import java.util.Collections; import java.util.Objects; @@ -42,7 +43,7 @@ public static class Request extends BaseTasksRequest implements ToXCont private String id; public Request(String id) { - this.id = ExceptionsHelper.requireNonNull(id, RollupField.ID.getPreferredName()); + this.id = ExceptionsHelper.requireNonNull(id, FeatureIndexBuilderJob.ID.getPreferredName()); } public Request() { @@ -71,7 +72,7 @@ public ActionRequestValidationException validate() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(RollupField.ID.getPreferredName(), id); + builder.field(FeatureIndexBuilderJob.ID.getPreferredName(), id); return builder; } @@ -92,7 +93,7 @@ public boolean equals(Object obj) { return Objects.equals(id, other.id); } } - + public static class RequestBuilder extends ActionRequestBuilder { protected RequestBuilder(ElasticsearchClient client, StartFeatureIndexBuilderJobAction action) { @@ -142,12 +143,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - public boolean equals(Object o) { - if (this == o) + public boolean equals(Object obj) { + if (this == obj) { return true; - if (o == null || getClass() != o.getClass()) + } + + if (obj == null || getClass() != obj.getClass()) { return false; - Response response = (Response) o; + } + Response response = (Response) obj; return started == response.started; } diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StopFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StopFeatureIndexBuilderJobAction.java new file mode 100644 index 0000000000000..49f3db0346246 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StopFeatureIndexBuilderJobAction.java @@ -0,0 +1,180 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + +import java.io.IOException; +import java.util.Collections; +import java.util.Objects; + +public class StopFeatureIndexBuilderJobAction extends Action { + + public static final StopFeatureIndexBuilderJobAction INSTANCE = new StopFeatureIndexBuilderJobAction(); + public static final String NAME = "cluster:admin/xpack/feature_index_builder/stop"; + + private StopFeatureIndexBuilderJobAction() { + super(NAME); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends BaseTasksRequest implements ToXContent { + private String id; + + public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + + static { + PARSER.declareString(Request::setId, FeatureIndexBuilderJob.ID); + } + + public Request(String id) { + this.id = ExceptionsHelper.requireNonNull(id, FeatureIndexBuilderJob.ID.getPreferredName()); + } + + public Request() { + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + id = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(FeatureIndexBuilderJob.ID.getPreferredName(), id); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(id, other.id); + } + + @Override + public boolean match(Task task) { + String expectedDescription = FeatureIndexBuilderJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + id; + + return task.getDescription().equals(expectedDescription); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, StopFeatureIndexBuilderJobAction action) { + super(client, action, new Request()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + + private boolean stopped; + + public Response() { + super(Collections.emptyList(), Collections.emptyList()); + } + + public Response(StreamInput in) throws IOException { + super(Collections.emptyList(), Collections.emptyList()); + readFrom(in); + } + + public Response(boolean stopped) { + super(Collections.emptyList(), Collections.emptyList()); + this.stopped = stopped; + } + + public boolean isStopped() { + return stopped; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + stopped = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(stopped); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("stopped", stopped); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Response response = (Response) o; + return stopped == response.stopped; + } + + @Override + public int hashCode() { + return Objects.hash(stopped); + } + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportDeleteFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportDeleteFeatureIndexBuilderJobAction.java new file mode 100644 index 0000000000000..56c118ac00436 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportDeleteFeatureIndexBuilderJobAction.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +public class TransportDeleteFeatureIndexBuilderJobAction + extends TransportMasterNodeAction { + + private final PersistentTasksService persistentTasksService; + + @Inject + public TransportDeleteFeatureIndexBuilderJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + PersistentTasksService persistentTasksService, ClusterService clusterService) { + super(settings, DeleteFeatureIndexBuilderJobAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, DeleteFeatureIndexBuilderJobAction.Request::new); + this.persistentTasksService = persistentTasksService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(DeleteFeatureIndexBuilderJobAction.Request request, ClusterState state, + ActionListener listener) throws Exception { + + String jobId = request.getId(); + TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option + + // Step 1. Cancel the persistent task + persistentTasksService.sendRemoveRequest(jobId, new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + logger.debug("Request to cancel Task for Feature Index Builder job [" + jobId + "] successful."); + + // Step 2. Wait for the task to finish cancellation internally + persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout, + new PersistentTasksService.WaitForPersistentTaskListener() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + logger.debug("Task for Feature Index Builder job [" + jobId + "] successfully canceled."); + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(Exception e) { + logger.error("Error while cancelling task for Feature Index Builder job [" + jobId + + "]." + e); + listener.onFailure(e); + } + + @Override + public void onTimeout(TimeValue timeout) { + String msg = "Stopping of Feature Index Builder job [" + jobId + "] timed out after [" + timeout + "]."; + logger.warn(msg); + listener.onFailure(new ElasticsearchException(msg)); + } + }); + } + + @Override + public void onFailure(Exception e) { + logger.error("Error while requesting to cancel task for Feature Index Builder job [" + jobId + "]" + e); + listener.onFailure(e); + } + }); + + } + + @Override + protected ClusterBlockException checkBlock(DeleteFeatureIndexBuilderJobAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java new file mode 100644 index 0000000000000..68a4fee37fa77 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/TransportStopFeatureIndexBuilderJobAction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJobTask; + +import java.io.IOException; +import java.util.List; + +public class TransportStopFeatureIndexBuilderJobAction extends TransportTasksAction { + + + @Inject + public TransportStopFeatureIndexBuilderJobAction(Settings settings, TransportService transportService, + ActionFilters actionFilters, ClusterService clusterService) { + super(settings, StopFeatureIndexBuilderJobAction.NAME, clusterService, transportService, actionFilters, + StopFeatureIndexBuilderJobAction.Request::new, StopFeatureIndexBuilderJobAction.Response::new, ThreadPool.Names.SAME); + } + + @Override + protected void doExecute(Task task, StopFeatureIndexBuilderJobAction.Request request, ActionListener listener) { + super.doExecute(task, request, listener); + } + + @Override + protected void taskOperation(StopFeatureIndexBuilderJobAction.Request request, + FeatureIndexBuilderJobTask jobTask, + ActionListener listener) { + if (jobTask.getConfig().getId().equals(request.getId())) { + jobTask.stop(listener); + } else { + listener.onFailure(new RuntimeException("ID of feature index builder task [" + jobTask.getConfig().getId() + + "] does not match request's ID [" + request.getId() + "]")); + } + } + + @Override + protected StopFeatureIndexBuilderJobAction.Response newResponse(StopFeatureIndexBuilderJobAction.Request request, List tasks, + List taskOperationFailures, + List failedNodeExceptions) { + + if (taskOperationFailures.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(taskOperationFailures.get(0).getCause()); + } else if (failedNodeExceptions.isEmpty() == false) { + throw org.elasticsearch.ExceptionsHelper + .convertToElastic(failedNodeExceptions.get(0)); + } + + // Either the job doesn't exist (the user didn't create it yet) or was deleted after the Stop API executed. + // In either case, let the user know + if (tasks.size() == 0) { + throw new ResourceNotFoundException("Task for Feature Index Builder Job [" + request.getId() + "] not found"); + } + + assert tasks.size() == 1; + + boolean allStopped = tasks.stream().allMatch(StopFeatureIndexBuilderJobAction.Response::isStopped); + return new StopFeatureIndexBuilderJobAction.Response(allStopped); + } + + @Override + protected StopFeatureIndexBuilderJobAction.Response readTaskResponse(StreamInput in) throws IOException { + return new StopFeatureIndexBuilderJobAction.Response(in); + } + +} \ No newline at end of file diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java index a1edfca2684a4..368a9e0897a05 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJob.java @@ -22,8 +22,12 @@ public class FeatureIndexBuilderJob implements XPackPlugin.XPackPersistentTaskParams { + public static final ParseField ID = new ParseField("id"); public static final String NAME = "xpack/feature_index_builder/job"; + // note: this is used to match tasks + public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "feature_index_builder-"; + private FeatureIndexBuilderJobConfig config; private static final ParseField CONFIG = new ParseField("config"); diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java index 5634910e6c529..580a3fe81ce30 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobState.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.featureindexbuilder.job; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -18,18 +19,30 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; +import java.util.SortedMap; +import java.util.TreeMap; + import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class FeatureIndexBuilderJobState implements Task.Status, PersistentTaskState { public static final String NAME = "xpack/feature_index_builder/job"; private final IndexerState state; + @Nullable + private final SortedMap currentPosition; + private static final ParseField STATE = new ParseField("job_state"); + private static final ParseField CURRENT_POSITION = new ParseField("current_position"); + @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - args -> new FeatureIndexBuilderJobState((IndexerState) args[0])); + args -> new FeatureIndexBuilderJobState((IndexerState) args[0], (HashMap) args[1])); static { PARSER.declareField(constructorArg(), p -> { @@ -37,21 +50,38 @@ public class FeatureIndexBuilderJobState implements Task.Status, PersistentTaskS return IndexerState.fromString(p.text()); } throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, STATE, ObjectParser.ValueType.STRING); + PARSER.declareField(optionalConstructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.START_OBJECT) { + return p.map(); + } + if (p.currentToken() == XContentParser.Token.VALUE_NULL) { + return null; + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY); } - public FeatureIndexBuilderJobState(IndexerState state) { + public FeatureIndexBuilderJobState(IndexerState state, @Nullable Map position) { this.state = state; + this.currentPosition = Collections.unmodifiableSortedMap(position == null ? null : new TreeMap<>(position)); + } public FeatureIndexBuilderJobState(StreamInput in) throws IOException { state = IndexerState.fromStream(in); + currentPosition = in.readBoolean() ? Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap())) : null; } public IndexerState getJobState() { return state; } + public Map getPosition() { + return currentPosition; + } + public static FeatureIndexBuilderJobState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -64,6 +94,9 @@ public static FeatureIndexBuilderJobState fromXContent(XContentParser parser) { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(STATE.getPreferredName(), state.value()); + if (currentPosition != null) { + builder.field(CURRENT_POSITION.getPreferredName(), currentPosition); + } builder.endObject(); return builder; } @@ -76,6 +109,10 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { state.writeTo(out); + out.writeBoolean(currentPosition != null); + if (currentPosition != null) { + out.writeMap(currentPosition); + } } @Override @@ -90,11 +127,11 @@ public boolean equals(Object other) { FeatureIndexBuilderJobState that = (FeatureIndexBuilderJobState) other; - return Objects.equals(this.state, that.state); + return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition); } @Override public int hashCode() { - return Objects.hash(state); + return Objects.hash(state, currentPosition); } } \ No newline at end of file diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java index 381e57e9027ca..173b9125d33ee 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderJobTask.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction; import org.elasticsearch.xpack.ml.featureindexbuilder.action.StartFeatureIndexBuilderJobAction.Response; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.StopFeatureIndexBuilderJobAction; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -41,7 +42,7 @@ public class FeatureIndexBuilderJobTask extends AllocatedPersistentTask implemen public FeatureIndexBuilderJobTask(long id, String type, String action, TaskId parentTask, FeatureIndexBuilderJob job, FeatureIndexBuilderJobState state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { - super(id, type, action, "" + "_" + job.getConfig().getId(), parentTask, headers); + super(id, type, action, FeatureIndexBuilderJob.PERSISTENT_TASK_DESCRIPTION_PREFIX + job.getConfig().getId(), parentTask, headers); this.job = job; this.threadPool = threadPool; logger.info("construct job task"); @@ -56,10 +57,17 @@ public FeatureIndexBuilderJobConfig getConfig() { } public synchronized void start(ActionListener listener) { + // TODO: safeguards missing, see rollup code indexer.start(); listener.onResponse(new StartFeatureIndexBuilderJobAction.Response(true)); } + public void stop(ActionListener listener) { + // TODO: safeguards missing, see rollup code + indexer.stop(); + listener.onResponse(new StopFeatureIndexBuilderJobAction.Response(true)); + } + @Override public void triggered(Event event) { if (event.getJobName().equals(SCHEDULE_NAME + "_" + job.getConfig().getId())) { @@ -95,9 +103,7 @@ protected void doSaveState(IndexerState indexerState, Map positi // If we're aborting, just invoke `next` (which is likely an onFailure handler) next.run(); } else { - // to be implemented - - final FeatureIndexBuilderJobState state = new FeatureIndexBuilderJobState(indexerState); + final FeatureIndexBuilderJobState state = new FeatureIndexBuilderJobState(indexerState, getPosition()); logger.info("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]"); // TODO: we can not persist the state right now, need to be called from the task diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestDeleteFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestDeleteFeatureIndexBuilderJobAction.java new file mode 100644 index 0000000000000..d4f0b542d4896 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestDeleteFeatureIndexBuilderJobAction.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.featureindexbuilder.rest.action; + + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.DeleteFeatureIndexBuilderJobAction; + +import java.io.IOException; + +public class RestDeleteFeatureIndexBuilderJobAction extends BaseRestHandler { + public static final ParseField ID = new ParseField("id"); + + public RestDeleteFeatureIndexBuilderJobAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.DELETE, FeatureIndexBuilder.BASE_PATH + "job/{id}/", this); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String id = restRequest.param(ID.getPreferredName()); + DeleteFeatureIndexBuilderJobAction.Request request = new DeleteFeatureIndexBuilderJobAction.Request(id); + + return channel -> client.execute(DeleteFeatureIndexBuilderJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + + @Override + public String getName() { + return "feature_index_builder_delete_job_action"; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestStopFeatureIndexBuilderJobAction.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestStopFeatureIndexBuilderJobAction.java new file mode 100644 index 0000000000000..f20ca65c766bf --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/rest/action/RestStopFeatureIndexBuilderJobAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.featureindexbuilder.rest.action; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.featureindexbuilder.FeatureIndexBuilder; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.StopFeatureIndexBuilderJobAction; +import org.elasticsearch.xpack.ml.featureindexbuilder.job.FeatureIndexBuilderJob; + +import java.io.IOException; + +public class RestStopFeatureIndexBuilderJobAction extends BaseRestHandler { + + public RestStopFeatureIndexBuilderJobAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, FeatureIndexBuilder.BASE_PATH + "job/{id}/_stop", this); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String id = restRequest.param(FeatureIndexBuilderJob.ID.getPreferredName()); + StopFeatureIndexBuilderJobAction.Request request = new StopFeatureIndexBuilderJobAction.Request(id); + + return channel -> client.execute(StopFeatureIndexBuilderJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + + @Override + public String getName() { + return "feature_index_builder_stop_job_action"; + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DeleteFeatureIndexBuilderJobActionRequestTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DeleteFeatureIndexBuilderJobActionRequestTests.java new file mode 100644 index 0000000000000..81eda8351f614 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/DeleteFeatureIndexBuilderJobActionRequestTests.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.DeleteFeatureIndexBuilderJobAction.Request; + +public class DeleteFeatureIndexBuilderJobActionRequestTests extends AbstractStreamableTestCase { + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLengthBetween(1, 20)); + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StopFeatureIndexBuilderJobActionRequestTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StopFeatureIndexBuilderJobActionRequestTests.java new file mode 100644 index 0000000000000..89c61a021f2b2 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/action/StopFeatureIndexBuilderJobActionRequestTests.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.action; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractStreamableXContentTestCase; +import org.elasticsearch.xpack.ml.featureindexbuilder.action.StopFeatureIndexBuilderJobAction.Request; +import java.io.IOException; + +public class StopFeatureIndexBuilderJobActionRequestTests extends AbstractStreamableXContentTestCase { + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLengthBetween(1, 10)); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected Request doParseInstance(XContentParser parser) throws IOException { + return Request.PARSER.parse(parser, null); + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } +}