From 35b49b88bbfb280107e6cc4afad77f5210e7d612 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 24 Oct 2018 11:06:38 -0400 Subject: [PATCH 1/6] [Rollup] Add `wait_for_stopped` option to StopRollupJob API This adds a flag which allows the user to block the Stop API until the task has actually moved to a stopped state, instead of returning immediately. If the flag is unset (default) the behavior is as before: returns immediately. If the flag is set to a TimeValue, it will block the call until the status changes to `STOPPED` (up to the specified time limit). If the time value is exceeded a timeout exception is thrown. Note: The job will still likely move to STOPPED eventually, the timeout just means the blocking call timed out before the state changed. --- docs/reference/rollup/apis/stop-job.asciidoc | 24 ++++++++++ .../test/rest/ESRestTestCase.java | 15 +----- .../rollup/action/StopRollupJobAction.java | 34 +++++++++++-- .../action/TransportStopRollupAction.java | 48 ++++++++++++++++++- .../rollup/rest/RestStopRollupJobAction.java | 8 ++-- .../api/xpack.rollup.stop_job.json | 7 +++ .../rest-api-spec/test/rollup/stop_job.yml | 21 ++++++++ 7 files changed, 135 insertions(+), 22 deletions(-) diff --git a/docs/reference/rollup/apis/stop-job.asciidoc b/docs/reference/rollup/apis/stop-job.asciidoc index 5912b2d688b70..f1adba3de4a2b 100644 --- a/docs/reference/rollup/apis/stop-job.asciidoc +++ b/docs/reference/rollup/apis/stop-job.asciidoc @@ -22,6 +22,14 @@ Stopping an already stopped job has no action. `job_id` (required):: (string) Identifier for the job +==== Query Parameters + +`wait_for_stopped` (optional):: + (TimeValue) If unset (the default), the API will return immediately. If set, the API will block for (at maximum) + the specified duration while waiting for the job to stop. If more than `wait_for_stopped` time has passed, the API + will throw a timeout exception. Note: even if a timeout exception is thrown, the stop request is still processing and + will eventually move the job to `STOPPED`. The timeout simply means the API call itself timed out while waiting + for the status change. ==== Request Body @@ -85,3 +93,19 @@ A 404 `resource_not_found` exception will be thrown: } ---- // TESTRESPONSE[s/"stack_trace": .../"stack_trace": $body.$_path/] + +===== Waiting for the job to stop + +Since only a stopped job can be deleted, it can be useful to block the StopJob API until the indexer has fully +stopped. This is accomplished with the `wait_for_stopped` query parameter: + + +[source,js] +-------------------------------------------------- +POST _xpack/rollup/job/sensor/_stop?wait_for_stopped=10s +-------------------------------------------------- +// CONSOLE +// TEST[setup:sensor_started_rollup_job] + +The parameter will block the API call from returning until either the job has moved to `STOPPED`, or the specified +time has elapsed. If the specified time elapses without the job moving to `STOPPED`, a timeout exception will be thrown. \ No newline at end of file diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 72299dab91245..0499ebeb5d904 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -466,24 +466,11 @@ private void wipeRollupJobs() throws IOException, InterruptedException { String jobId = (String) ((Map) jobConfig.get("config")).get("id"); Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop"); request.addParameter("ignore", "404"); + request.addParameter("wait_for_stopped", "10s"); logger.debug("stopping rollup job [{}]", jobId); adminClient().performRequest(request); } - // TODO this is temporary until StopJob API gains the ability to block until stopped - awaitBusy(() -> { - Request request = new Request("GET", "/_xpack/rollup/job/_all"); - try { - Response jobsResponse = adminClient().performRequest(request); - String body = EntityUtils.toString(jobsResponse.getEntity()); - logger.error(body); - // If the body contains any of the non-stopped states, at least one job is not finished yet - return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains); - } catch (IOException e) { - return false; - } - }, 10, TimeUnit.SECONDS); - for (Map jobConfig : jobConfigs) { @SuppressWarnings("unchecked") String jobId = (String) ((Map) jobConfig.get("config")).get("id"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java index eb48d640f21eb..ed808952d9408 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java @@ -5,15 +5,19 @@ */ package org.elasticsearch.xpack.core.rollup.action; +import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -28,6 +32,7 @@ public class StopRollupJobAction extends Action { public static final StopRollupJobAction INSTANCE = new StopRollupJobAction(); public static final String NAME = "cluster:admin/xpack/rollup/stop"; + public static final ParseField WAIT_FOR_STOPPED = new ParseField("wait_for_stopped"); private StopRollupJobAction() { super(NAME); @@ -40,9 +45,15 @@ public Response newResponse() { public static class Request extends BaseTasksRequest implements ToXContent { private String id; + private TimeValue waitForStopped = null; - public Request(String id) { + public Request (String id) { + this(id, null); + } + + public Request(String id, @Nullable TimeValue waitForStopped) { this.id = ExceptionsHelper.requireNonNull(id, RollupField.ID.getPreferredName()); + this.waitForStopped = waitForStopped; } public Request() {} @@ -51,16 +62,29 @@ public String getId() { return id; } + @Nullable + public TimeValue waitForStopped() { + return waitForStopped; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readString(); + // TODO change this after backport + if (in.getVersion().onOrAfter(Version.CURRENT)) { + waitForStopped = in.readTimeValue(); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); + // TODO change this after backport + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeTimeValue(waitForStopped); + } } @Override @@ -71,12 +95,15 @@ public ActionRequestValidationException validate() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(RollupField.ID.getPreferredName(), id); + if (waitForStopped != null) { + builder.field(WAIT_FOR_STOPPED.getPreferredName(), waitForStopped); + } return builder; } @Override public int hashCode() { - return Objects.hash(id); + return Objects.hash(id, waitForStopped); } @Override @@ -88,7 +115,8 @@ public boolean equals(Object obj) { return false; } Request other = (Request) obj; - return Objects.equals(id, other.id); + return Objects.equals(id, other.id) + && Objects.equals(waitForStopped, other.waitForStopped); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java index ee55d8081feb8..282dd4cb735d2 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.rollup.action; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; @@ -15,14 +16,19 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; +import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.rollup.job.RollupJobTask; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; public class TransportStopRollupAction extends TransportTasksAction listener) { if (jobTask.getConfig().getId().equals(request.getId())) { jobTask.stop(listener); + waitForStopped(request, jobTask, listener); } else { listener.onFailure(new RuntimeException("ID of rollup task [" + jobTask.getConfig().getId() + "] does not match request's ID [" + request.getId() + "]")); } } + private static void waitForStopped(StopRollupJobAction.Request request, + RollupJobTask jobTask, + ActionListener listener) { + if (request.waitForStopped() != null) { + try { + boolean stopped = awaitBusy(() -> ((RollupJobStatus) jobTask.getStatus()).getIndexerState().equals(IndexerState.STOPPED), + request.waitForStopped()); + + if (stopped == false) { + listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.waitForStopped().getStringRep() + + "] while waiting for rollup job [" + request.getId() + "] to stop")); + } + } catch (InterruptedException e) { + listener.onFailure(e); + } + } + } + + /** + * Lifted from ESTestCase, must stay private and do not reuse! This is temporary until + * the Rollup state refactor makes it unnecessary to await on a status change + */ + private static boolean awaitBusy(BooleanSupplier breakSupplier, TimeValue maxWaitTime) throws InterruptedException { + long maxTimeInMillis = maxWaitTime.getMillis(); + long timeInMillis = 1; + long sum = 0; + while (sum + timeInMillis < maxTimeInMillis) { + if (breakSupplier.getAsBoolean()) { + return true; + } + Thread.sleep(timeInMillis); + sum += timeInMillis; + timeInMillis = Math.min(1000L, timeInMillis * 2); + } + timeInMillis = maxTimeInMillis - sum; + Thread.sleep(Math.max(timeInMillis, 0)); + return breakSupplier.getAsBoolean(); + } + @Override protected StopRollupJobAction.Response newResponse(StopRollupJobAction.Request request, List tasks, List taskOperationFailures, @@ -88,4 +134,4 @@ protected StopRollupJobAction.Response readTaskResponse(StreamInput in) throws I return new StopRollupJobAction.Response(in); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java index 0f03cdd1ebe2c..60b83e5dc54e4 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java @@ -7,6 +7,7 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; @@ -15,8 +16,6 @@ import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; import org.elasticsearch.xpack.rollup.Rollup; -import java.io.IOException; - public class RestStopRollupJobAction extends BaseRestHandler { public RestStopRollupJobAction(Settings settings, RestController controller) { @@ -25,9 +24,10 @@ public RestStopRollupJobAction(Settings settings, RestController controller) { } @Override - protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { String id = restRequest.param(RollupField.ID.getPreferredName()); - StopRollupJobAction.Request request = new StopRollupJobAction.Request(id); + TimeValue waitForStopped = restRequest.paramAsTime(StopRollupJobAction.WAIT_FOR_STOPPED.getPreferredName(), null); + StopRollupJobAction.Request request = new StopRollupJobAction.Request(id, waitForStopped); return channel -> client.execute(StopRollupJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json index 39c3fd4a11359..6f6ce6b6e317d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json @@ -11,6 +11,13 @@ "required": true, "description": "The ID of the job to stop" } + }, + "params": { + "wait_for_stopped": { + "type": "time", + "required": false, + "description": "If set, the API will block for (at maximum) the specified duration while waiting for the job to stop. If unset, returns immediately." + } } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml index 849aca3332dfe..1384a288d2105 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml @@ -82,3 +82,24 @@ setup: - is_true: stopped +--- +"Test wait_for_stopped": + - skip: + version: " - 6.5.0" + reason: wait_for_stopped option was added in 6.5 + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + xpack.rollup.start_job: + id: foo + - is_true: started + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + xpack.rollup.stop_job: + id: foo + wait_for_stopped: 5s + - is_true: stopped + From e9c05c30c9d3b58eec483ee9c9071eb298b4b050 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 24 Oct 2018 11:50:57 -0400 Subject: [PATCH 2/6] checkstyle --- .../main/java/org/elasticsearch/test/rest/ESRestTestCase.java | 2 -- .../xpack/rollup/action/TransportStopRollupAction.java | 1 - 2 files changed, 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 0499ebeb5d904..704695846c178 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -54,7 +54,6 @@ import org.junit.Before; import javax.net.ssl.SSLContext; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -68,7 +67,6 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java index 282dd4cb735d2..6b3d40db71905 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Consumer; From a288fecbb1016fcff1c39ee6e6d1364895704c17 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 29 Oct 2018 14:09:57 -0400 Subject: [PATCH 3/6] Replace with `wait_for_completion` and `timeout` --- docs/reference/rollup/apis/stop-job.asciidoc | 20 ++++-- .../test/rest/ESRestTestCase.java | 3 +- .../rollup/action/StopRollupJobAction.java | 40 +++++++---- .../action/TransportStopRollupAction.java | 67 +++++++++++-------- .../rollup/rest/RestStopRollupJobAction.java | 5 +- .../api/xpack.rollup.stop_job.json | 9 ++- .../rest-api-spec/test/rollup/stop_job.yml | 26 ++++++- 7 files changed, 114 insertions(+), 56 deletions(-) diff --git a/docs/reference/rollup/apis/stop-job.asciidoc b/docs/reference/rollup/apis/stop-job.asciidoc index f1adba3de4a2b..b53d676582e51 100644 --- a/docs/reference/rollup/apis/stop-job.asciidoc +++ b/docs/reference/rollup/apis/stop-job.asciidoc @@ -24,12 +24,16 @@ Stopping an already stopped job has no action. ==== Query Parameters -`wait_for_stopped` (optional):: - (TimeValue) If unset (the default), the API will return immediately. If set, the API will block for (at maximum) - the specified duration while waiting for the job to stop. If more than `wait_for_stopped` time has passed, the API +`wait_for_completion` (optional):: + (boolean) if set to true, causes the API to block until the indexer state completely stops. If set to false, the + API returns immediately and the indexer will be stopped asynchronously in the background. Defaults to `false`. + +`timeout` (optional):: + (TimeValue) if `wait_for_completion=true`, the API will block for (at maximum) + the specified duration while waiting for the job to stop. If more than `timeout` time has passed, the API will throw a timeout exception. Note: even if a timeout exception is thrown, the stop request is still processing and will eventually move the job to `STOPPED`. The timeout simply means the API call itself timed out while waiting - for the status change. + for the status change. Defaults to `30s` ==== Request Body @@ -97,15 +101,17 @@ A 404 `resource_not_found` exception will be thrown: ===== Waiting for the job to stop Since only a stopped job can be deleted, it can be useful to block the StopJob API until the indexer has fully -stopped. This is accomplished with the `wait_for_stopped` query parameter: +stopped. This is accomplished with the `wait_for_completion` query parameter, and optionally a `timeout`: [source,js] -------------------------------------------------- -POST _xpack/rollup/job/sensor/_stop?wait_for_stopped=10s +POST _xpack/rollup/job/sensor/_stop?wait_for_completion=true&timeout=10s -------------------------------------------------- // CONSOLE // TEST[setup:sensor_started_rollup_job] The parameter will block the API call from returning until either the job has moved to `STOPPED`, or the specified -time has elapsed. If the specified time elapses without the job moving to `STOPPED`, a timeout exception will be thrown. \ No newline at end of file +time has elapsed. If the specified time elapses without the job moving to `STOPPED`, a timeout exception will be thrown. + +If `wait_for_completion=true` is specified without a `timeout`, a default timeout of 30 seconds is used. \ No newline at end of file diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 704695846c178..06c6a5f43cbfe 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -464,7 +464,8 @@ private void wipeRollupJobs() throws IOException, InterruptedException { String jobId = (String) ((Map) jobConfig.get("config")).get("id"); Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop"); request.addParameter("ignore", "404"); - request.addParameter("wait_for_stopped", "10s"); + request.addParameter("wait_for_completion", "true"); + request.addParameter("timeout", "10s"); logger.debug("stopping rollup job [{}]", jobId); adminClient().performRequest(request); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java index ed808952d9408..fbcb1bf1571f1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java @@ -27,12 +27,15 @@ import java.io.IOException; import java.util.Collections; import java.util.Objects; +import java.util.concurrent.TimeUnit; public class StopRollupJobAction extends Action { public static final StopRollupJobAction INSTANCE = new StopRollupJobAction(); public static final String NAME = "cluster:admin/xpack/rollup/stop"; - public static final ParseField WAIT_FOR_STOPPED = new ParseField("wait_for_stopped"); + public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion"); + public static final ParseField TIMEOUT = new ParseField("timeout"); + public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); private StopRollupJobAction() { super(NAME); @@ -45,15 +48,17 @@ public Response newResponse() { public static class Request extends BaseTasksRequest implements ToXContent { private String id; - private TimeValue waitForStopped = null; + private boolean waitForCompletion = false; + private TimeValue timeout = null; public Request (String id) { - this(id, null); + this(id, false, null); } - public Request(String id, @Nullable TimeValue waitForStopped) { + public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) { this.id = ExceptionsHelper.requireNonNull(id, RollupField.ID.getPreferredName()); - this.waitForStopped = waitForStopped; + this.timeout = timeout == null ? DEFAULT_TIMEOUT : timeout; + this.waitForCompletion = waitForCompletion; } public Request() {} @@ -62,9 +67,12 @@ public String getId() { return id; } - @Nullable - public TimeValue waitForStopped() { - return waitForStopped; + public TimeValue timeout() { + return timeout; + } + + public boolean waitForCompletion() { + return waitForCompletion; } @Override @@ -73,7 +81,8 @@ public void readFrom(StreamInput in) throws IOException { id = in.readString(); // TODO change this after backport if (in.getVersion().onOrAfter(Version.CURRENT)) { - waitForStopped = in.readTimeValue(); + waitForCompletion = in.readBoolean(); + timeout = in.readTimeValue(); } } @@ -83,7 +92,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(id); // TODO change this after backport if (out.getVersion().onOrAfter(Version.CURRENT)) { - out.writeTimeValue(waitForStopped); + out.writeBoolean(waitForCompletion); + out.writeTimeValue(timeout); } } @@ -95,15 +105,16 @@ public ActionRequestValidationException validate() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(RollupField.ID.getPreferredName(), id); - if (waitForStopped != null) { - builder.field(WAIT_FOR_STOPPED.getPreferredName(), waitForStopped); + builder.field(WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion); + if (timeout != null) { + builder.field(TIMEOUT.getPreferredName(), timeout); } return builder; } @Override public int hashCode() { - return Objects.hash(id, waitForStopped); + return Objects.hash(id, waitForCompletion, timeout); } @Override @@ -116,7 +127,8 @@ public boolean equals(Object obj) { } Request other = (Request) obj; return Objects.equals(id, other.id) - && Objects.equals(waitForStopped, other.waitForStopped); + && Objects.equals(waitForCompletion, other.waitForCompletion) + && Objects.equals(timeout, other.timeout); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java index 6b3d40db71905..6a636da7e8410 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; +import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction.Response; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.rollup.job.RollupJobTask; @@ -31,14 +32,13 @@ import java.util.function.Consumer; public class TransportStopRollupAction extends TransportTasksAction { - + Response, Response> { @Inject public TransportStopRollupAction(Settings settings, TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) { super(settings, StopRollupJobAction.NAME, clusterService, transportService, actionFilters, - StopRollupJobAction.Request::new, StopRollupJobAction.Response::new, ThreadPool.Names.SAME); + StopRollupJobAction.Request::new, Response::new, ThreadPool.Names.SAME); } @Override @@ -47,39 +47,50 @@ protected void processTasks(StopRollupJobAction.Request request, Consumer listener) { + protected void doExecute(Task task, StopRollupJobAction.Request request, ActionListener listener) { super.doExecute(task, request, listener); } @Override - protected void taskOperation(StopRollupJobAction.Request request, - RollupJobTask jobTask, - ActionListener listener) { + protected void taskOperation(StopRollupJobAction.Request request, RollupJobTask jobTask, ActionListener listener) { if (jobTask.getConfig().getId().equals(request.getId())) { - jobTask.stop(listener); - waitForStopped(request, jobTask, listener); + jobTask.stop(maybeWrapWithBlocking(request, jobTask, listener)); } else { listener.onFailure(new RuntimeException("ID of rollup task [" + jobTask.getConfig().getId() + "] does not match request's ID [" + request.getId() + "]")); } } - private static void waitForStopped(StopRollupJobAction.Request request, - RollupJobTask jobTask, - ActionListener listener) { - if (request.waitForStopped() != null) { - try { - boolean stopped = awaitBusy(() -> ((RollupJobStatus) jobTask.getStatus()).getIndexerState().equals(IndexerState.STOPPED), - request.waitForStopped()); - - if (stopped == false) { - listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.waitForStopped().getStringRep() - + "] while waiting for rollup job [" + request.getId() + "] to stop")); + private static ActionListener maybeWrapWithBlocking(StopRollupJobAction.Request request, + RollupJobTask jobTask, + ActionListener listener) { + if (request.waitForCompletion()) { + return ActionListener.wrap(response -> { + if (response.isStopped()) { + // The Task acknowledged that it is stopped/stopping... wait until the status actually + // changes over before returning + try { + boolean stopped = awaitBusy(() -> ((RollupJobStatus) jobTask.getStatus()) + .getIndexerState().equals(IndexerState.STOPPED), request.timeout()); + + if (stopped) { + // We have successfully confirmed a stop, send back the response + listener.onResponse(response); + } else { + listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep() + + "] while waiting for rollup job [" + request.getId() + "] to stop")); + } + } catch (InterruptedException e) { + listener.onFailure(e); + } + } else { + // Did not acknowledge stop, just return the response + listener.onResponse(response); } - } catch (InterruptedException e) { - listener.onFailure(e); - } + }, listener::onFailure); } + // No request to block, execute async + return listener; } /** @@ -104,7 +115,7 @@ private static boolean awaitBusy(BooleanSupplier breakSupplier, TimeValue maxWai } @Override - protected StopRollupJobAction.Response newResponse(StopRollupJobAction.Request request, List tasks, + protected Response newResponse(StopRollupJobAction.Request request, List tasks, List taskOperationFailures, List failedNodeExceptions) { @@ -124,13 +135,13 @@ protected StopRollupJobAction.Response newResponse(StopRollupJobAction.Request r assert tasks.size() == 1; - boolean allStopped = tasks.stream().allMatch(StopRollupJobAction.Response::isStopped); - return new StopRollupJobAction.Response(allStopped); + boolean allStopped = tasks.stream().allMatch(Response::isStopped); + return new Response(allStopped); } @Override - protected StopRollupJobAction.Response readTaskResponse(StreamInput in) throws IOException { - return new StopRollupJobAction.Response(in); + protected Response readTaskResponse(StreamInput in) throws IOException { + return new Response(in); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java index 60b83e5dc54e4..60b04148660a0 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java @@ -26,8 +26,9 @@ public RestStopRollupJobAction(Settings settings, RestController controller) { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { String id = restRequest.param(RollupField.ID.getPreferredName()); - TimeValue waitForStopped = restRequest.paramAsTime(StopRollupJobAction.WAIT_FOR_STOPPED.getPreferredName(), null); - StopRollupJobAction.Request request = new StopRollupJobAction.Request(id, waitForStopped); + TimeValue timeout = restRequest.paramAsTime(StopRollupJobAction.TIMEOUT.getPreferredName(), StopRollupJobAction.DEFAULT_TIMEOUT); + boolean waitForCompletion = restRequest.paramAsBoolean(StopRollupJobAction.WAIT_FOR_COMPLETION.getPreferredName(), false); + StopRollupJobAction.Request request = new StopRollupJobAction.Request(id, waitForCompletion, timeout); return channel -> client.execute(StopRollupJobAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json index 6f6ce6b6e317d..40ca1e07927b5 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json @@ -13,10 +13,15 @@ } }, "params": { - "wait_for_stopped": { + "wait_for_completion": { + "type": "boolean", + "required": false, + "description": "True if the API should block until the job has fully stopped, false if should be executed async. Defaults to false." + }, + "timeout": { "type": "time", "required": false, - "description": "If set, the API will block for (at maximum) the specified duration while waiting for the job to stop. If unset, returns immediately." + "description": "Block for (at maximum) the specified duration while waiting for the job to stop. Defaults to 30s." } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml index 1384a288d2105..22fae48b33eba 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml @@ -83,7 +83,7 @@ setup: - is_true: stopped --- -"Test wait_for_stopped": +"Test wait_for_completion default timeout": - skip: version: " - 6.5.0" reason: wait_for_stopped option was added in 6.5 @@ -100,6 +100,28 @@ setup: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser xpack.rollup.stop_job: id: foo - wait_for_stopped: 5s + wait_for_completion: true + - is_true: stopped + +--- +"Test wait_for_completion with custom timeout": + - skip: + version: " - 6.5.0" + reason: wait_for_stopped option was added in 6.5 + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + xpack.rollup.start_job: + id: foo + - is_true: started + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + xpack.rollup.stop_job: + id: foo + wait_for_completion: true + timeout: "5s" - is_true: stopped From eced3937908e35cbd375ffb591dcaf8af2715301 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 29 Oct 2018 15:00:05 -0400 Subject: [PATCH 4/6] checkstyle --- .../main/java/org/elasticsearch/test/rest/ESRestTestCase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index b736791497b22..cb71b8c10e73a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -55,7 +55,6 @@ import org.junit.Before; import javax.net.ssl.SSLContext; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -69,7 +68,6 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; From e7c70c509223aea302a7e3a83e64a99c19f2fe7a Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 8 Nov 2018 11:32:14 -0500 Subject: [PATCH 5/6] Switch to generic threadpool when blocking --- .../action/TransportStopRollupAction.java | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java index a88e9d4823724..424e3345e63b1 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java @@ -32,11 +32,14 @@ public class TransportStopRollupAction extends TransportTasksAction { + private final ThreadPool threadPool; + @Inject - public TransportStopRollupAction(TransportService transportService, - ActionFilters actionFilters, ClusterService clusterService) { + public TransportStopRollupAction(TransportService transportService, ActionFilters actionFilters, + ClusterService clusterService, ThreadPool threadPool) { super(StopRollupJobAction.NAME, clusterService, transportService, actionFilters, StopRollupJobAction.Request::new, StopRollupJobAction.Response::new, ThreadPool.Names.SAME); + this.threadPool = threadPool; } @Override @@ -53,7 +56,7 @@ protected void doExecute(Task task, StopRollupJobAction.Request request, ActionL protected void taskOperation(StopRollupJobAction.Request request, RollupJobTask jobTask, ActionListener listener) { if (jobTask.getConfig().getId().equals(request.getId())) { - jobTask.stop(maybeWrapWithBlocking(request, jobTask, listener)); + jobTask.stop(maybeWrapWithBlocking(request, jobTask, listener, threadPool)); } else { listener.onFailure(new RuntimeException("ID of rollup task [" + jobTask.getConfig().getId() + "] does not match request's ID [" + request.getId() + "]")); @@ -62,26 +65,31 @@ protected void taskOperation(StopRollupJobAction.Request request, RollupJobTask private static ActionListener maybeWrapWithBlocking(StopRollupJobAction.Request request, RollupJobTask jobTask, - ActionListener listener) { + ActionListener listener, + ThreadPool threadPool) { if (request.waitForCompletion()) { return ActionListener.wrap(response -> { if (response.isStopped()) { // The Task acknowledged that it is stopped/stopping... wait until the status actually - // changes over before returning - try { - boolean stopped = awaitBusy(() -> ((RollupJobStatus) jobTask.getStatus()) - .getIndexerState().equals(IndexerState.STOPPED), request.timeout()); - - if (stopped) { - // We have successfully confirmed a stop, send back the response - listener.onResponse(response); - } else { - listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep() - + "] while waiting for rollup job [" + request.getId() + "] to stop")); + // changes over before returning. Switch over to Generic threadpool so + // we don't block the network thread + threadPool.generic().execute(() -> { + try { + boolean stopped = awaitBusy(() -> ((RollupJobStatus) jobTask.getStatus()) + .getIndexerState().equals(IndexerState.STOPPED), request.timeout()); + + if (stopped) { + // We have successfully confirmed a stop, send back the response + listener.onResponse(response); + } else { + listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep() + + "] while waiting for rollup job [" + request.getId() + "] to stop")); + } + } catch (InterruptedException e) { + listener.onFailure(e); } - } catch (InterruptedException e) { - listener.onFailure(e); - } + }); + } else { // Did not acknowledge stop, just return the response listener.onResponse(response); From 46a37d1ea9192e1e217b13b9e0f9413300b7ba00 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 13 Nov 2018 10:17:16 -0500 Subject: [PATCH 6/6] Update yaml skip version, now in 6.6 --- .../test/resources/rest-api-spec/test/rollup/stop_job.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml index 563211ff8eba3..721a6d41276dd 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml @@ -87,8 +87,8 @@ setup: --- "Test wait_for_completion default timeout": - skip: - version: " - 6.5.0" - reason: wait_for_stopped option was added in 6.5 + version: " - 6.6.0" + reason: wait_for_completion option was added in 6.6 - do: headers: @@ -108,8 +108,8 @@ setup: --- "Test wait_for_completion with custom timeout": - skip: - version: " - 6.5.0" - reason: wait_for_stopped option was added in 6.5 + version: " - 6.6.0" + reason: wait_for_completion option was added in 6.6 - do: headers: