diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 841b686deff54..db14947c8d3af 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -17,6 +17,8 @@ public final class DataFrameField { public static final ParseField ID = new ParseField("id"); public static final ParseField JOBS = new ParseField("jobs"); public static final ParseField COUNT = new ParseField("count"); + public static final ParseField TIMEOUT = new ParseField("timeout"); + public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion"); // common strings public static final String TASK_NAME = "data_frame/jobs"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java new file mode 100644 index 0000000000000..f7d3514e1a442 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe; + +import java.text.MessageFormat; +import java.util.Locale; + +public class DataFrameMessages { + + public static final String REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT = + "Timed out after [{0}] while waiting for data frame job [{1}] to stop"; + public static final String REST_STOP_JOB_WAIT_FOR_COMPLETION_INTERRUPT = "Interrupted while waiting for data frame job [{0}] to stop"; + + private DataFrameMessages() { + } + + /** + * Returns the message parameter + * + * @param message Should be one of the statics defined in this class + */ + public static String getMessage(String message) { + return message; + } + + /** + * Format the message with the supplied arguments + * + * @param message Should be one of the statics defined in this class + * @param args MessageFormat arguments. See {@linkplain MessageFormat#format(Object)}] + */ + public static String getMessage(String message, Object... args) { + return new MessageFormat(message, Locale.ROOT).format(args); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java new file mode 100644 index 0000000000000..ffe49918e97a3 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe; + +import org.elasticsearch.test.ESTestCase; + +public class DataFrameMessagesTests extends ESTestCase { + + public void testGetMessage_WithFormatStrings() { + String formattedMessage = DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT, "30s", + "my_job"); + assertEquals("Timed out after [30s] while waiting for data frame job [my_job] to stop", formattedMessage); + } +} diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataframePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataframePivotRestIT.java index 7eac6564e34cd..d764737811f0d 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataframePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataframePivotRestIT.java @@ -8,7 +8,6 @@ import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.http.util.EntityUtils; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -20,7 +19,6 @@ import org.junit.AfterClass; import org.junit.Before; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -175,13 +173,20 @@ private void waitForDataFrameGeneration(String jobId) throws Exception { }, 30, TimeUnit.SECONDS); } - private int getDataFrameGeneration(String jobId) throws IOException { + private static int getDataFrameGeneration(String jobId) throws IOException { Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + jobId + "/_stats")); Map jobStatsAsMap = (Map) ((List) entityAsMap(statsResponse).get("jobs")).get(0); return (int) XContentMapValues.extractValue("state.generation", jobStatsAsMap); } + private static String getDataFrameIndexerState(String jobId) throws IOException { + Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + jobId + "/_stats")); + + Map jobStatsAsMap = (Map) ((List) entityAsMap(statsResponse).get("jobs")).get(0); + return (String) XContentMapValues.extractValue("state.job_state", jobStatsAsMap); + } + private void refreshIndex(String index) throws IOException { assertOK(client().performRequest(new Request("POST", index + "/_refresh"))); } @@ -208,25 +213,13 @@ private static void wipeDataFrameJobs() throws IOException, InterruptedException for (Map jobConfig : jobConfigs) { String jobId = (String) jobConfig.get("id"); Request request = new Request("POST", DATAFRAME_ENDPOINT + jobId + "/_stop"); + request.addParameter("wait_for_completion", "true"); + request.addParameter("timeout", "10s"); request.addParameter("ignore", "404"); adminClient().performRequest(request); + assertEquals("stopped", getDataFrameIndexerState(jobId)); } - // TODO this is temporary until the StopDataFrameJob API gains the ability to block until stopped - boolean stopped = awaitBusy(() -> { - Request request = new Request("GET", DATAFRAME_ENDPOINT + "_all"); - try { - Response jobsResponse = adminClient().performRequest(request); - String body = EntityUtils.toString(jobsResponse.getEntity()); - // If the body contains any of the non-stopped states, at least one job is not finished yet - return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains); - } catch (IOException e) { - return false; - } - }, 10, TimeUnit.SECONDS); - - assertTrue("Timed out waiting for data frame job(s) to stop", stopped); - for (Map jobConfig : jobConfigs) { String jobId = (String) jobConfig.get("id"); Request request = new Request("DELETE", DATAFRAME_ENDPOINT + jobId); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobAction.java index 08c517a380c30..b31bde4f843c6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobAction.java @@ -11,10 +11,11 @@ import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -25,12 +26,15 @@ import java.io.IOException; import java.util.Collections; import java.util.Objects; +import java.util.concurrent.TimeUnit; public class StopDataFrameJobAction extends Action { public static final StopDataFrameJobAction INSTANCE = new StopDataFrameJobAction(); public static final String NAME = "cluster:admin/data_frame/stop"; + public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); + private StopDataFrameJobAction() { super(NAME); } @@ -42,23 +46,24 @@ public Response newResponse() { public static class Request extends BaseTasksRequest implements ToXContent { private String id; + private final boolean waitForCompletion; - public static ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); - - static { - PARSER.declareString(Request::setId, DataFrameField.ID); - } - - public Request(String id) { + public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); + this.waitForCompletion = waitForCompletion; + + // use the timeout value already present in BaseTasksRequest + this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout); } public Request() { + this(null, false, null); } public Request(StreamInput in) throws IOException { super(in); id = in.readString(); + waitForCompletion = in.readBoolean(); } public String getId() { @@ -69,10 +74,15 @@ public void setId(String id) { this.id = id; } + public boolean waitForCompletion() { + return waitForCompletion; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); + out.writeBoolean(waitForCompletion); } @Override @@ -83,12 +93,17 @@ public ActionRequestValidationException validate() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(DataFrameField.ID.getPreferredName(), id); + builder.field(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion); + if (this.getTimeout() != null) { + builder.field(DataFrameField.TIMEOUT.getPreferredName(), this.getTimeout()); + } return builder; } @Override public int hashCode() { - return Objects.hash(id); + // the base class does not implement hashCode, therefore we need to hash timeout ourselves + return Objects.hash(id, waitForCompletion, this.getTimeout()); } @Override @@ -101,7 +116,13 @@ public boolean equals(Object obj) { return false; } Request other = (Request) obj; - return Objects.equals(id, other.id); + + // the base class does not implement equals, therefore we need to compare timeout ourselves + if (Objects.equals(this.getTimeout(), other.getTimeout()) == false) { + return false; + } + + return Objects.equals(id, other.id) && Objects.equals(waitForCompletion, other.waitForCompletion); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameJobAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameJobAction.java index 5c0cb444754fe..e76c6dabc3e3c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameJobAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameJobAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.dataframe.action; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -14,21 +16,30 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.dataframe.job.DataFrameJobTask; import java.util.List; +import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; + public class TransportStopDataFrameJobAction extends TransportTasksAction { + private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); + private final ThreadPool threadPool; + @Inject - public TransportStopDataFrameJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) { + public TransportStopDataFrameJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService, + ThreadPool threadPool) { super(StopDataFrameJobAction.NAME, clusterService, transportService, actionFilters, StopDataFrameJobAction.Request::new, StopDataFrameJobAction.Response::new, StopDataFrameJobAction.Response::new, ThreadPool.Names.SAME); + this.threadPool = threadPool; } @Override @@ -41,7 +52,42 @@ protected void doExecute(Task task, StopDataFrameJobAction.Request request, protected void taskOperation(StopDataFrameJobAction.Request request, DataFrameJobTask jobTask, ActionListener listener) { if (jobTask.getConfig().getId().equals(request.getId())) { - jobTask.stop(listener); + if (request.waitForCompletion() == false) { + jobTask.stop(listener); + } else { + ActionListener blockingListener = ActionListener.wrap(response -> { + if (response.isStopped()) { + // The Task acknowledged that it is stopped/stopping... wait until the status actually + // changes over before returning. Switch over to Generic threadpool so + // we don't block the network thread + threadPool.generic().execute(() -> { + try { + long untilInNanos = System.nanoTime() + request.getTimeout().getNanos(); + + while (System.nanoTime() - untilInNanos < 0) { + if (jobTask.isStopped()) { + listener.onResponse(response); + return; + } + Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); + } + // ran out of time + listener.onFailure(new ElasticsearchTimeoutException( + DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT, + request.getTimeout().getStringRep(), request.getId()))); + } catch (InterruptedException e) { + listener.onFailure(new ElasticsearchException(DataFrameMessages + .getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e)); + } + }); + } else { + // Did not acknowledge stop, just return the response + listener.onResponse(response); + } + }, listener::onFailure); + + jobTask.stop(blockingListener); + } } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + jobTask.getConfig().getId() + "] does not match request's ID [" + request.getId() + "]")); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java index c93f39fc57cb2..9059d863aa913 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java @@ -101,6 +101,10 @@ public long getGeneration() { return generation.get(); } + public boolean isStopped() { + return indexer.getState().equals(IndexerState.STOPPED); + } + public synchronized void start(ActionListener listener) { final IndexerState prevState = indexer.getState(); if (prevState != IndexerState.STOPPED) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameJobAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameJobAction.java index 7f17a388b5b93..f5979264d07fd 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameJobAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameJobAction.java @@ -7,6 +7,7 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; @@ -26,7 +27,10 @@ public RestStopDataFrameJobAction(Settings settings, RestController controller) @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String id = restRequest.param(DataFrameField.ID.getPreferredName()); - StopDataFrameJobAction.Request request = new StopDataFrameJobAction.Request(id); + TimeValue timeout = restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), StopDataFrameJobAction.DEFAULT_TIMEOUT); + boolean waitForCompletion = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), false); + + StopDataFrameJobAction.Request request = new StopDataFrameJobAction.Request(id, waitForCompletion, timeout); return channel -> client.execute(StopDataFrameJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobActionRequestTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobActionRequestTests.java index f7e3faa21a5cd..d5a292fe71aac 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobActionRequestTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobActionRequestTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.dataframe.action; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.dataframe.action.StopDataFrameJobAction.Request; @@ -14,11 +15,23 @@ public class StopDataFrameJobActionRequestTests extends AbstractWireSerializingT @Override protected Request createTestInstance() { - return new Request(randomAlphaOfLengthBetween(1, 10)); + TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null; + return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), timeout); } @Override protected Writeable.Reader instanceReader() { return Request::new; } + + public void testSameButDifferentTimeout() { + String id = randomAlphaOfLengthBetween(1, 10); + boolean waitForCompletion = randomBoolean(); + + Request r1 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(10)); + Request r2 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(20)); + + assertNotEquals(r1,r2); + assertNotEquals(r1.hashCode(),r2.hashCode()); + } }