From 02b284172172db50f8d4f4df6c71db140e8b137a Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 17 Oct 2018 16:18:40 -0400 Subject: [PATCH 1/7] [Rollup] Job deletion should be invoked on allocated task We should delete a job by directly talking to the allocated task and telling it to shutdown. Today we shut down a job via the persistent task framework. This is not ideal because, while the job has been removed from the persistent task CS, the allocated task continues to live until it gets the shutdown message. This means a user can delete a job, immediately delete the rollup index, and then see new documents appear in the just-deleted index. This happens because the indexer in the allocated task is still running and indexes a few more documents before getting the shutdown command. In this PR, the transport action is changed to a TransportTasksAction, and we invoke `onCancelled()` directly on the matching job. There is still a potential for a race if a bulk happens right before the task shuts down, the async bulk will complete and cause the same situation. The lag between signal and stop is shorter, so the chance is less, but it still exists. We'll fix this in a followup PR. --- .../rollup/action/DeleteRollupJobAction.java | 95 ++++++++++-- .../TransportDeleteRollupJobAction.java | 135 +++++++++--------- .../xpack/rollup/job/RollupJobTask.java | 3 +- .../rest-api-spec/test/rollup/delete_job.yml | 2 +- 4 files changed, 151 insertions(+), 84 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java index df2c70c76539e..36a3088a8e75c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java @@ -7,22 +7,29 @@ import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; +import org.elasticsearch.action.support.tasks.BaseTasksRequest; +import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.rollup.RollupField; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Objects; -public class DeleteRollupJobAction extends Action { +public class DeleteRollupJobAction extends Action { public static final DeleteRollupJobAction INSTANCE = new DeleteRollupJobAction(); public static final String NAME = "cluster:admin/xpack/rollup/delete"; @@ -32,11 +39,11 @@ private DeleteRollupJobAction() { } @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + public Response newResponse() { + return new Response(); } - public static class Request extends AcknowledgedRequest implements ToXContent { + public static class Request extends BaseTasksRequest implements ToXContentFragment { private String id; public Request(String id) { @@ -45,6 +52,11 @@ public Request(String id) { public Request() {} + @Override + public boolean match(Task task) { + return task.getDescription().equals(RollupField.NAME + "_" + id); + } + public String getId() { return id; } @@ -90,10 +102,71 @@ public boolean equals(Object obj) { } } - public static class RequestBuilder extends MasterNodeOperationRequestBuilder { - + public static class RequestBuilder extends ActionRequestBuilder { protected RequestBuilder(ElasticsearchClient client, DeleteRollupJobAction action) { - super(client, action, new Request()); + super(client, action, new DeleteRollupJobAction.Request()); + } + } + + public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { + + private boolean acknowledged; + + public Response(StreamInput in) throws IOException { + super(Collections.emptyList(), Collections.emptyList()); + readFrom(in); + } + + public Response(boolean acknowledged, List taskFailures, List nodeFailures) { + super(taskFailures, nodeFailures); + this.acknowledged = acknowledged; + } + + public Response(boolean acknowledged) { + super(Collections.emptyList(), Collections.emptyList()); + this.acknowledged = acknowledged; + } + + public Response() { + super(Collections.emptyList(), Collections.emptyList()); + this.acknowledged = false; + } + + public boolean isDeleted() { + return acknowledged; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + acknowledged = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("acknowledged", acknowledged); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o; + return acknowledged == response.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hash(acknowledged); } } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java index 97b4483b1ff03..28a4be5c0061f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java @@ -5,103 +5,96 @@ */ package org.elasticsearch.xpack.rollup.action; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; +import org.elasticsearch.xpack.rollup.job.RollupJobTask; -import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.io.IOException; +import java.util.List; -public class TransportDeleteRollupJobAction - extends TransportMasterNodeAction { - - private final PersistentTasksService persistentTasksService; +public class TransportDeleteRollupJobAction extends TransportTasksAction { @Inject - public TransportDeleteRollupJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - PersistentTasksService persistentTasksService, ClusterService clusterService) { - super(settings, DeleteRollupJobAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, DeleteRollupJobAction.Request::new); - this.persistentTasksService = persistentTasksService; + public TransportDeleteRollupJobAction(Settings settings, TransportService transportService, + ActionFilters actionFilters, ClusterService clusterService) { + super(settings, DeleteRollupJobAction.NAME, clusterService, transportService, actionFilters, + DeleteRollupJobAction.Request::new, DeleteRollupJobAction.Response::new, ThreadPool.Names.SAME); } @Override - protected String executor() { - return ThreadPool.Names.SAME; - } + protected void doExecute(Task task, DeleteRollupJobAction.Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + + if (nodes.isLocalNodeElectedMaster()) { + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { + super.doExecute(task, request, listener); + } else { + // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this call, + // no need to go looking for the allocated task + listener.onFailure(new ResourceNotFoundException("the task with id [" + request.getId() + "] doesn't exist")); + } - @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + } else { + // Delegates DeleteJob to elected master node, so it becomes the coordinating node. + // Non-master nodes may have a stale cluster state that shows jobs which are cancelled + // on the master, which makes testing difficult. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, DeleteRollupJobAction.Response::new)); + } + } } @Override - protected void masterOperation(DeleteRollupJobAction.Request request, ClusterState state, - ActionListener listener) throws Exception { - - String jobId = request.getId(); - TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option - - // Step 1. Cancel the persistent task - persistentTasksService.sendRemoveRequest(jobId, new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - logger.debug("Request to cancel Task for Rollup job [" + jobId + "] successful."); + protected void taskOperation(DeleteRollupJobAction.Request request, RollupJobTask jobTask, + ActionListener listener) { - // Step 2. Wait for the task to finish cancellation internally - persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout, - new PersistentTasksService.WaitForPersistentTaskListener() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - logger.debug("Task for Rollup job [" + jobId + "] successfully canceled."); - listener.onResponse(new AcknowledgedResponse(true)); - } + assert jobTask.getConfig().getId().equals(request.getId()); - @Override - public void onFailure(Exception e) { - logger.error("Error while cancelling task for Rollup job [" + jobId - + "]." + e); - listener.onFailure(e); - } - - @Override - public void onTimeout(TimeValue timeout) { - String msg = "Stopping of Rollup job [" + jobId + "] timed out after [" + timeout + "]."; - logger.warn(msg); - listener.onFailure(new ElasticsearchException(msg)); - } - }); - } - - @Override - public void onFailure(Exception e) { - logger.error("Error while requesting to cancel task for Rollup job [" + jobId - + "]" + e); - listener.onFailure(e); - } - }); + // Tell the task to shut down. TODO if the indexer is running this just signals to shutdown. + // The actual shutdown may be delayed, which we can fix with an await in a followup PR + jobTask.onCancelled(); + listener.onResponse(new DeleteRollupJobAction.Response(true)); + } + @Override + protected DeleteRollupJobAction.Response newResponse(DeleteRollupJobAction.Request request, List tasks, + List taskOperationFailures, + List failedNodeExceptions) { + // There should theoretically only be one task running the rollup job + // If there are more, in production it should be ok as long as they are acknowledge shutting down. + // But in testing we'd like to know there were more than one hence the assert + assert tasks.size() == 1; + boolean cancelled = tasks.stream().anyMatch(DeleteRollupJobAction.Response::isDeleted); + return new DeleteRollupJobAction.Response(cancelled, taskOperationFailures, failedNodeExceptions); } @Override - protected ClusterBlockException checkBlock(DeleteRollupJobAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + protected DeleteRollupJobAction.Response readTaskResponse(StreamInput in) throws IOException { + DeleteRollupJobAction.Response response = new DeleteRollupJobAction.Response(); + response.readFrom(in); + return response; } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index d16f47b1a3503..03ae1880fedf3 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.rollup.Rollup; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -349,7 +350,7 @@ synchronized void shutdown() { * shut down from the inside. */ @Override - protected synchronized void onCancelled() { + public synchronized void onCancelled() { logger.info("Received cancellation request for Rollup job [" + job.getConfig().getId() + "], state: [" + indexer.getState() + "]"); if (indexer.abort()) { // there is no background job running, we can shutdown safely diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml index e5c7c76234022..592d858d54552 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml @@ -180,7 +180,7 @@ setup: "Test delete non-existent job": - do: - catch: /the task with id does_not_exist doesn't exist/ + catch: /the task with id \[does_not_exist\] doesn't exist/ headers: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser xpack.rollup.delete_job: From f4d5befc68bbe38ddf465f522b8a8f14704ffe7c Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 17 Oct 2018 18:23:13 -0400 Subject: [PATCH 2/7] checkstyle --- .../java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 03ae1880fedf3..3fbe77b64b41c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -37,7 +37,6 @@ import org.elasticsearch.xpack.rollup.Rollup; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; From a353baa9c31640d2262587ade84a4a339b3df361 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 18 Oct 2018 14:10:55 -0400 Subject: [PATCH 3/7] Make DeleteJob depend on STOPPED state To simplify the synchronization, we decided that only a STOPPED job can be deleted. This commit enforces that check at the task level. The rest of the changes are just to facilitate percolating this information back to the user: - Common XContent output for failed task/node is moved out of ListTasksResponse and into the superclass for general use - The Rest layer is adjusted to throw a 5xx error if there are failures - Tweak to the acknowledged logic to account for task failures Also updated docs and tests --- .../reference/rollup/apis/delete-job.asciidoc | 4 +-- .../node/tasks/list/ListTasksResponse.java | 24 ----------------- .../support/tasks/BaseTasksResponse.java | 27 +++++++++++++++++++ .../test/rest/ESRestTestCase.java | 27 ++++++++++++++++++- .../rollup/action/DeleteRollupJobAction.java | 5 +++- .../TransportDeleteRollupJobAction.java | 19 ++++++++----- .../rest/RestDeleteRollupJobAction.java | 12 ++++++++- .../rest-api-spec/test/rollup/delete_job.yml | 5 +++- 8 files changed, 86 insertions(+), 37 deletions(-) diff --git a/docs/reference/rollup/apis/delete-job.asciidoc b/docs/reference/rollup/apis/delete-job.asciidoc index 37774560848c5..9fe9c4108679c 100644 --- a/docs/reference/rollup/apis/delete-job.asciidoc +++ b/docs/reference/rollup/apis/delete-job.asciidoc @@ -8,8 +8,8 @@ experimental[] -This API deletes an existing rollup job. The job can be started or stopped, in both cases it will be deleted. Attempting -to delete a non-existing job will throw an exception +This API deletes an existing rollup job. A job must be *stopped* first before it can be deleted. Attempting to delete +a started job will result in an error. Similarly, attempting to delete a nonexistent job will throw an exception. .Deleting the job does not delete rolled up data ********************************** diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index 0ab1391faa211..e6f1c52aae891 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -51,8 +51,6 @@ */ public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject { private static final String TASKS = "tasks"; - private static final String TASK_FAILURES = "task_failures"; - private static final String NODE_FAILURES = "node_failures"; private List tasks; @@ -246,28 +244,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - private void toXContentCommon(XContentBuilder builder, Params params) throws IOException { - if (getTaskFailures() != null && getTaskFailures().size() > 0) { - builder.startArray(TASK_FAILURES); - for (TaskOperationFailure ex : getTaskFailures()){ - builder.startObject(); - builder.value(ex); - builder.endObject(); - } - builder.endArray(); - } - - if (getNodeFailures() != null && getNodeFailures().size() > 0) { - builder.startArray(NODE_FAILURES); - for (ElasticsearchException ex : getNodeFailures()) { - builder.startObject(); - ex.toXContent(builder, params); - builder.endObject(); - } - builder.endArray(); - } - } - public static ListTasksResponse fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java index 1436410bf2046..2fb3be9f3520f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java @@ -25,6 +25,8 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -41,6 +43,9 @@ * Base class for responses of task-related operations */ public class BaseTasksResponse extends ActionResponse { + protected static final String TASK_FAILURES = "task_failures"; + protected static final String NODE_FAILURES = "node_failures"; + private List taskFailures; private List nodeFailures; @@ -103,4 +108,26 @@ public void writeTo(StreamOutput out) throws IOException { exp.writeTo(out); } } + + protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException { + if (getTaskFailures() != null && getTaskFailures().size() > 0) { + builder.startArray(TASK_FAILURES); + for (TaskOperationFailure ex : getTaskFailures()){ + builder.startObject(); + builder.value(ex); + builder.endObject(); + } + builder.endArray(); + } + + if (getNodeFailures() != null && getNodeFailures().size() > 0) { + builder.startArray(NODE_FAILURES); + for (ElasticsearchException ex : getNodeFailures()) { + builder.startObject(); + ex.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 1c1f2a7eee67f..843110d406eb9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -65,11 +65,13 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; import static java.util.Collections.sort; import static java.util.Collections.unmodifiableList; @@ -391,7 +393,7 @@ private void wipeClusterSettings() throws IOException { } } - private void wipeRollupJobs() throws IOException { + private void wipeRollupJobs() throws IOException, InterruptedException { Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all")); Map jobs = entityAsMap(response); @SuppressWarnings("unchecked") @@ -402,6 +404,29 @@ private void wipeRollupJobs() throws IOException { return; } + for (Map jobConfig : jobConfigs) { + @SuppressWarnings("unchecked") + String jobId = (String) ((Map) jobConfig.get("config")).get("id"); + Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop"); + request.addParameter("ignore", "404"); + logger.debug("stopping rollup job [{}]", jobId); + adminClient().performRequest(request); + } + + // TODO this is temporary until StopJob API gains the ability to block until stopped + awaitBusy(() -> { + Request request = new Request("GET", "/_xpack/rollup/job/_all"); + try { + Response jobsResponse = adminClient().performRequest(request); + String body = EntityUtils.toString(jobsResponse.getEntity()); + logger.error(body); + // If the body contains any of the non-stopped states, at least one job is not finished yet + return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains); + } catch (IOException e) { + return false; + } + }, 10, TimeUnit.SECONDS); + for (Map jobConfig : jobConfigs) { @SuppressWarnings("unchecked") String jobId = (String) ((Map) jobConfig.get("config")).get("id"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java index 36a3088a8e75c..930da416e67ad 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java @@ -151,7 +151,10 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("acknowledged", acknowledged); + { + toXContentCommon(builder, params); + builder.field("acknowledged", acknowledged); + } builder.endObject(); return builder; } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java index 28a4be5c0061f..5cdc40df4d699 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java @@ -23,7 +23,9 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; +import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.rollup.job.RollupJobTask; import java.io.IOException; @@ -72,11 +74,14 @@ protected void taskOperation(DeleteRollupJobAction.Request request, RollupJobTas ActionListener listener) { assert jobTask.getConfig().getId().equals(request.getId()); - - // Tell the task to shut down. TODO if the indexer is running this just signals to shutdown. - // The actual shutdown may be delayed, which we can fix with an await in a followup PR - jobTask.onCancelled(); - listener.onResponse(new DeleteRollupJobAction.Response(true)); + IndexerState state = ((RollupJobStatus) jobTask.getStatus()).getIndexerState(); + if (state.equals(IndexerState.STOPPED) ) { + jobTask.onCancelled(); + listener.onResponse(new DeleteRollupJobAction.Response(true)); + } else { + listener.onFailure(new IllegalStateException("Could not delete job [" + request.getId() + "] because " + + "indexer state is [" + state + "]. Job must be [" + IndexerState.STOPPED + "] before deletion.")); + } } @Override @@ -86,8 +91,8 @@ protected DeleteRollupJobAction.Response newResponse(DeleteRollupJobAction.Reque // There should theoretically only be one task running the rollup job // If there are more, in production it should be ok as long as they are acknowledge shutting down. // But in testing we'd like to know there were more than one hence the assert - assert tasks.size() == 1; - boolean cancelled = tasks.stream().anyMatch(DeleteRollupJobAction.Response::isDeleted); + assert tasks.size() + taskOperationFailures.size() == 1; + boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(DeleteRollupJobAction.Response::isDeleted); return new DeleteRollupJobAction.Response(cancelled, taskOperationFailures, failedNodeExceptions); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java index 140b7d9b76943..77d39a45ac52d 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestDeleteRollupJobAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; import org.elasticsearch.xpack.rollup.Rollup; @@ -31,7 +32,16 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient String id = restRequest.param(ID.getPreferredName()); DeleteRollupJobAction.Request request = new DeleteRollupJobAction.Request(id); - return channel -> client.execute(DeleteRollupJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute(DeleteRollupJobAction.INSTANCE, request, + new RestToXContentListener(channel) { + @Override + protected RestStatus getStatus(DeleteRollupJobAction.Response response) { + if (response.getNodeFailures().size() > 0 || response.getTaskFailures().size() > 0) { + return RestStatus.INTERNAL_SERVER_ERROR; + } + return RestStatus.OK; + } + }); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml index 592d858d54552..861be094fa62d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml @@ -172,9 +172,12 @@ setup: - is_true: started - do: + catch: request xpack.rollup.delete_job: id: foo - - is_true: acknowledged + - is_false: acknowledged + - match: { task_failures.0.reason.type: "illegal_state_exception" } + - match: { task_failures.0.reason.reason: "Could not delete job [foo] because indexer state is [STARTED]. Job must be [STOPPED] before deletion." } --- "Test delete non-existent job": From 0e88165bee2af5db70d8787c9a0d4f0a2f53919d Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 18 Oct 2018 14:32:43 -0400 Subject: [PATCH 4/7] checkstyle --- .../main/java/org/elasticsearch/test/rest/ESRestTestCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 843110d406eb9..a4f956a2f91e9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -71,7 +71,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.function.BooleanSupplier; import static java.util.Collections.sort; import static java.util.Collections.unmodifiableList; From 6f54698db9a2a1c24c6be21dd29307447b365480 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 19 Oct 2018 12:39:26 -0400 Subject: [PATCH 5/7] Remove redundant test cleanup Since the rollup cleanup code was moved to ESRestTestCase, this code is redundant (and throws errors since it is out of date) --- .../elasticsearch/multi_node/RollupIT.java | 70 ------------------- 1 file changed, 70 deletions(-) diff --git a/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java index fb9c665b2bf1c..3ea1b8e67471c 100644 --- a/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java +++ b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.multi_node; -import org.apache.http.HttpStatus; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.apache.http.util.EntityUtils; @@ -20,16 +19,10 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath; -import org.junit.After; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; @@ -68,13 +61,6 @@ static Map toMap(String response) throws IOException { return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); } - @After - public void clearRollupMetadata() throws Exception { - deleteAllJobs(); - waitForPendingTasks(); - // indices will be deleted by the ESRestTestCase class - } - public void testBigRollup() throws Exception { final int numDocs = 200; String dateFormat = "strict_date_optional_time"; @@ -293,60 +279,4 @@ private Map getJob(Map jobsMap, String targetJob } return null; } - - private void waitForPendingTasks() throws Exception { - ESTestCase.assertBusy(() -> { - try { - Request request = new Request("GET", "/_cat/tasks"); - request.addParameter("detailed", "true"); - Response response = adminClient().performRequest(request); - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { - try (BufferedReader responseReader = new BufferedReader( - new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { - int activeTasks = 0; - String line; - StringBuilder tasksListString = new StringBuilder(); - while ((line = responseReader.readLine()) != null) { - - // We only care about Rollup jobs, otherwise this fails too easily due to unrelated tasks - if (line.startsWith(RollupJob.NAME) == true) { - activeTasks++; - tasksListString.append(line); - tasksListString.append('\n'); - } - } - assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks); - } - } - } catch (IOException e) { - throw new AssertionError("Error getting active tasks list", e); - } - }); - } - - @SuppressWarnings("unchecked") - private void deleteAllJobs() throws Exception { - Request request = new Request("GET", "/_xpack/rollup/job/_all"); - Response response = adminClient().performRequest(request); - Map jobs = ESRestTestCase.entityAsMap(response); - @SuppressWarnings("unchecked") - List> jobConfigs = - (List>) XContentMapValues.extractValue("jobs", jobs); - - if (jobConfigs == null) { - return; - } - - for (Map jobConfig : jobConfigs) { - logger.debug(jobConfig); - String jobId = (String) ((Map) jobConfig.get("config")).get("id"); - logger.debug("Deleting job " + jobId); - try { - request = new Request("DELETE", "/_xpack/rollup/job/" + jobId); - adminClient().performRequest(request); - } catch (Exception e) { - // ok - } - } - } } From 3bc180e51040fc919b532a88bc335bc5f1ed8bdd Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 22 Oct 2018 13:13:32 -0400 Subject: [PATCH 6/7] Fix equals and hashcode for Delete and BaseTasksResponse --- .../support/tasks/BaseTasksResponse.java | 19 +++++++++++++++++++ .../rollup/action/DeleteRollupJobAction.java | 4 ++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java index 2fb3be9f3520f..090aaf628ac52 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; @@ -130,4 +131,22 @@ protected void toXContentCommon(XContentBuilder builder, ToXContent.Params param builder.endArray(); } } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BaseTasksResponse response = (BaseTasksResponse) o; + return taskFailures.equals(response.taskFailures) + && nodeFailures.equals(response.nodeFailures); + } + + @Override + public int hashCode() { + return Objects.hash(taskFailures, nodeFailures); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java index 930da416e67ad..f1c6213cd70e3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java @@ -164,12 +164,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o; - return acknowledged == response.acknowledged; + return super.equals(o) && acknowledged == response.acknowledged; } @Override public int hashCode() { - return Objects.hash(acknowledged); + return Objects.hash(super.hashCode(), acknowledged); } } } From 7fdd9e7ed50167495533bdcafd3cd59901292e8d Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 22 Oct 2018 17:15:14 -0400 Subject: [PATCH 7/7] Fix doc tests --- docs/reference/rollup/apis/delete-job.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/rollup/apis/delete-job.asciidoc b/docs/reference/rollup/apis/delete-job.asciidoc index 9fe9c4108679c..f649d3ee60d97 100644 --- a/docs/reference/rollup/apis/delete-job.asciidoc +++ b/docs/reference/rollup/apis/delete-job.asciidoc @@ -99,12 +99,12 @@ A 404 `resource_not_found` exception will be thrown: "root_cause" : [ { "type" : "resource_not_found_exception", - "reason" : "the task with id does_not_exist doesn't exist", + "reason" : "the task with id [does_not_exist] doesn't exist", "stack_trace": ... } ], "type" : "resource_not_found_exception", - "reason" : "the task with id does_not_exist doesn't exist", + "reason" : "the task with id [does_not_exist] doesn't exist", "stack_trace": ... }, "status": 404