Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/reference/rollup/apis/stop-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ Stopping an already stopped job has no action.
`job_id` (required)::
(string) Identifier for the job

==== Query Parameters

`wait_for_completion` (optional)::
(boolean) if set to true, causes the API to block until the indexer state completely stops. If set to false, the
API returns immediately and the indexer will be stopped asynchronously in the background. Defaults to `false`.

`timeout` (optional)::
(TimeValue) if `wait_for_completion=true`, the API will block for (at maximum)
the specified duration while waiting for the job to stop. If more than `timeout` time has passed, the API
will throw a timeout exception. Note: even if a timeout exception is thrown, the stop request is still processing and
will eventually move the job to `STOPPED`. The timeout simply means the API call itself timed out while waiting
for the status change. Defaults to `30s`

==== Request Body

Expand Down Expand Up @@ -85,3 +97,21 @@ A 404 `resource_not_found` exception will be thrown:
}
----
// TESTRESPONSE[s/"stack_trace": .../"stack_trace": $body.$_path/]

===== Waiting for the job to stop

Since only a stopped job can be deleted, it can be useful to block the StopJob API until the indexer has fully
stopped. This is accomplished with the `wait_for_completion` query parameter, and optionally a `timeout`:


[source,js]
--------------------------------------------------
POST _xpack/rollup/job/sensor/_stop?wait_for_completion=true&timeout=10s
--------------------------------------------------
// CONSOLE
// TEST[setup:sensor_started_rollup_job]

The parameter will block the API call from returning until either the job has moved to `STOPPED`, or the specified
time has elapsed. If the specified time elapses without the job moving to `STOPPED`, a timeout exception will be thrown.

If `wait_for_completion=true` is specified without a `timeout`, a default timeout of 30 seconds is used.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.junit.Before;

import javax.net.ssl.SSLContext;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -69,7 +68,6 @@
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -491,25 +489,12 @@ private void wipeRollupJobs() throws IOException, InterruptedException {
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop");
request.addParameter("ignore", "404");
request.addParameter("wait_for_completion", "true");
request.addParameter("timeout", "10s");
logger.debug("stopping rollup job [{}]", jobId);
adminClient().performRequest(request);
}

// TODO this is temporary until StopJob API gains the ability to block until stopped
boolean stopped = awaitBusy(() -> {
Request request = new Request("GET", "/_xpack/rollup/job/_all");
try {
Response jobsResponse = adminClient().performRequest(request);
String body = EntityUtils.toString(jobsResponse.getEntity());
// If the body contains any of the non-stopped states, at least one job is not finished yet
return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
} catch (IOException e) {
return false;
}
}, 10, TimeUnit.SECONDS);

assertTrue("Timed out waiting for rollup job(s) to stop", stopped);

for (Map<String, Object> jobConfig : jobConfigs) {
@SuppressWarnings("unchecked")
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
*/
package org.elasticsearch.xpack.core.rollup.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -23,11 +27,15 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {

public static final StopRollupJobAction INSTANCE = new StopRollupJobAction();
public static final String NAME = "cluster:admin/xpack/rollup/stop";
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);

private StopRollupJobAction() {
super(NAME);
Expand All @@ -40,9 +48,17 @@ public Response newResponse() {

public static class Request extends BaseTasksRequest<Request> implements ToXContent {
private String id;
private boolean waitForCompletion = false;
private TimeValue timeout = null;

public Request(String id) {
public Request (String id) {
this(id, false, null);
}

public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) {
this.id = ExceptionsHelper.requireNonNull(id, RollupField.ID.getPreferredName());
this.timeout = timeout == null ? DEFAULT_TIMEOUT : timeout;
this.waitForCompletion = waitForCompletion;
}

public Request() {}
Expand All @@ -51,16 +67,34 @@ public String getId() {
return id;
}

public TimeValue timeout() {
return timeout;
}

public boolean waitForCompletion() {
return waitForCompletion;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
// TODO change this after backport
if (in.getVersion().onOrAfter(Version.CURRENT)) {
waitForCompletion = in.readBoolean();
timeout = in.readTimeValue();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
// TODO change this after backport
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeBoolean(waitForCompletion);
out.writeTimeValue(timeout);
}
}

@Override
Expand All @@ -71,12 +105,16 @@ public ActionRequestValidationException validate() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(RollupField.ID.getPreferredName(), id);
builder.field(WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion);
if (timeout != null) {
builder.field(TIMEOUT.getPreferredName(), timeout);
}
return builder;
}

@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, waitForCompletion, timeout);
}

@Override
Expand All @@ -88,7 +126,9 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id)
&& Objects.equals(waitForCompletion, other.waitForCompletion)
&& Objects.equals(timeout, other.timeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.rollup.action;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
Expand All @@ -14,24 +15,31 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.rollup.job.RollupJobTask;

import java.io.IOException;
import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

public class TransportStopRollupAction extends TransportTasksAction<RollupJobTask, StopRollupJobAction.Request,
StopRollupJobAction.Response, StopRollupJobAction.Response> {
StopRollupJobAction.Response, StopRollupJobAction.Response> {

private final ThreadPool threadPool;

@Inject
public TransportStopRollupAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
public TransportStopRollupAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, ThreadPool threadPool) {
super(StopRollupJobAction.NAME, clusterService, transportService, actionFilters,
StopRollupJobAction.Request::new, StopRollupJobAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
}

@Override
Expand All @@ -45,17 +53,74 @@ protected void doExecute(Task task, StopRollupJobAction.Request request, ActionL
}

@Override
protected void taskOperation(StopRollupJobAction.Request request,
RollupJobTask jobTask,
protected void taskOperation(StopRollupJobAction.Request request, RollupJobTask jobTask,
ActionListener<StopRollupJobAction.Response> listener) {
if (jobTask.getConfig().getId().equals(request.getId())) {
jobTask.stop(listener);
jobTask.stop(maybeWrapWithBlocking(request, jobTask, listener, threadPool));
} else {
listener.onFailure(new RuntimeException("ID of rollup task [" + jobTask.getConfig().getId()
+ "] does not match request's ID [" + request.getId() + "]"));
}
}

private static ActionListener<StopRollupJobAction.Response> maybeWrapWithBlocking(StopRollupJobAction.Request request,
RollupJobTask jobTask,
ActionListener<StopRollupJobAction.Response> listener,
ThreadPool threadPool) {
if (request.waitForCompletion()) {
return ActionListener.wrap(response -> {
if (response.isStopped()) {
// The Task acknowledged that it is stopped/stopping... wait until the status actually
// changes over before returning. Switch over to Generic threadpool so
// we don't block the network thread
threadPool.generic().execute(() -> {
try {
boolean stopped = awaitBusy(() -> ((RollupJobStatus) jobTask.getStatus())
.getIndexerState().equals(IndexerState.STOPPED), request.timeout());

if (stopped) {
// We have successfully confirmed a stop, send back the response
listener.onResponse(response);
} else {
listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep()
+ "] while waiting for rollup job [" + request.getId() + "] to stop"));
}
} catch (InterruptedException e) {
listener.onFailure(e);
}
});

} else {
// Did not acknowledge stop, just return the response
listener.onResponse(response);
}
}, listener::onFailure);
}
// No request to block, execute async
return listener;
}

/**
* Lifted from ESTestCase, must stay private and do not reuse! This is temporary until
* the Rollup state refactor makes it unnecessary to await on a status change
*/
private static boolean awaitBusy(BooleanSupplier breakSupplier, TimeValue maxWaitTime) throws InterruptedException {
long maxTimeInMillis = maxWaitTime.getMillis();
long timeInMillis = 1;
long sum = 0;
while (sum + timeInMillis < maxTimeInMillis) {
if (breakSupplier.getAsBoolean()) {
return true;
}
Thread.sleep(timeInMillis);
sum += timeInMillis;
timeInMillis = Math.min(1000L, timeInMillis * 2);
}
timeInMillis = maxTimeInMillis - sum;
Thread.sleep(Math.max(timeInMillis, 0));
return breakSupplier.getAsBoolean();
}

@Override
protected StopRollupJobAction.Response newResponse(StopRollupJobAction.Request request, List<StopRollupJobAction.Response> tasks,
List<TaskOperationFailure> taskOperationFailures,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -15,8 +16,6 @@
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.rollup.Rollup;

import java.io.IOException;

public class RestStopRollupJobAction extends BaseRestHandler {

public RestStopRollupJobAction(Settings settings, RestController controller) {
Expand All @@ -25,9 +24,11 @@ public RestStopRollupJobAction(Settings settings, RestController controller) {
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
String id = restRequest.param(RollupField.ID.getPreferredName());
StopRollupJobAction.Request request = new StopRollupJobAction.Request(id);
TimeValue timeout = restRequest.paramAsTime(StopRollupJobAction.TIMEOUT.getPreferredName(), StopRollupJobAction.DEFAULT_TIMEOUT);
boolean waitForCompletion = restRequest.paramAsBoolean(StopRollupJobAction.WAIT_FOR_COMPLETION.getPreferredName(), false);
StopRollupJobAction.Request request = new StopRollupJobAction.Request(id, waitForCompletion, timeout);

return channel -> client.execute(StopRollupJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
"required": true,
"description": "The ID of the job to stop"
}
},
"params": {
"wait_for_completion": {
"type": "boolean",
"required": false,
"description": "True if the API should block until the job has fully stopped, false if should be executed async. Defaults to false."
},
"timeout": {
"type": "time",
"required": false,
"description": "Block for (at maximum) the specified duration while waiting for the job to stop. Defaults to 30s."
}
}
}
}
Expand Down
Loading