Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public final class DataFrameField {
public static final ParseField ID = new ParseField("id");
public static final ParseField JOBS = new ParseField("jobs");
public static final ParseField COUNT = new ParseField("count");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");

// common strings
public static final String TASK_NAME = "data_frame/jobs";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import java.text.MessageFormat;
import java.util.Locale;

public class DataFrameMessages {

public static final String REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT =
"Timed out after [{0}] while waiting for data frame job [{1}] to stop";
public static final String REST_STOP_JOB_WAIT_FOR_COMPLETION_INTERRUPT = "Interrupted while waiting for data frame job [{0}] to stop";

private DataFrameMessages() {
}

/**
* Returns the message parameter
*
* @param message Should be one of the statics defined in this class
*/
public static String getMessage(String message) {
return message;
}

/**
* Format the message with the supplied arguments
*
* @param message Should be one of the statics defined in this class
* @param args MessageFormat arguments. See {@linkplain MessageFormat#format(Object)}]
*/
public static String getMessage(String message, Object... args) {
return new MessageFormat(message, Locale.ROOT).format(args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import org.elasticsearch.test.ESTestCase;

public class DataFrameMessagesTests extends ESTestCase {

public void testGetMessage_WithFormatStrings() {
String formattedMessage = DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT, "30s",
"my_job");
assertEquals("Timed out after [30s] while waiting for data frame job [my_job] to stop", formattedMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
Expand All @@ -20,7 +19,6 @@
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -175,13 +173,20 @@ private void waitForDataFrameGeneration(String jobId) throws Exception {
}, 30, TimeUnit.SECONDS);
}

private int getDataFrameGeneration(String jobId) throws IOException {
private static int getDataFrameGeneration(String jobId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + jobId + "/_stats"));

Map<?, ?> jobStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("jobs")).get(0);
return (int) XContentMapValues.extractValue("state.generation", jobStatsAsMap);
}

private static String getDataFrameIndexerState(String jobId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + jobId + "/_stats"));

Map<?, ?> jobStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("jobs")).get(0);
return (String) XContentMapValues.extractValue("state.job_state", jobStatsAsMap);
}

private void refreshIndex(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
}
Expand All @@ -208,25 +213,13 @@ private static void wipeDataFrameJobs() throws IOException, InterruptedException
for (Map<String, Object> jobConfig : jobConfigs) {
String jobId = (String) jobConfig.get("id");
Request request = new Request("POST", DATAFRAME_ENDPOINT + jobId + "/_stop");
request.addParameter("wait_for_completion", "true");
request.addParameter("timeout", "10s");
request.addParameter("ignore", "404");
adminClient().performRequest(request);
assertEquals("stopped", getDataFrameIndexerState(jobId));
}

// TODO this is temporary until the StopDataFrameJob API gains the ability to block until stopped
boolean stopped = awaitBusy(() -> {
Request request = new Request("GET", DATAFRAME_ENDPOINT + "_all");
try {
Response jobsResponse = adminClient().performRequest(request);
String body = EntityUtils.toString(jobsResponse.getEntity());
// If the body contains any of the non-stopped states, at least one job is not finished yet
return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
} catch (IOException e) {
return false;
}
}, 10, TimeUnit.SECONDS);

assertTrue("Timed out waiting for data frame job(s) to stop", stopped);

for (Map<String, Object> jobConfig : jobConfigs) {
String jobId = (String) jobConfig.get("id");
Request request = new Request("DELETE", DATAFRAME_ENDPOINT + jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -25,12 +26,15 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class StopDataFrameJobAction extends Action<StopDataFrameJobAction.Response> {

public static final StopDataFrameJobAction INSTANCE = new StopDataFrameJobAction();
public static final String NAME = "cluster:admin/data_frame/stop";

public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);

private StopDataFrameJobAction() {
super(NAME);
}
Expand All @@ -42,23 +46,24 @@ public Response newResponse() {

public static class Request extends BaseTasksRequest<Request> implements ToXContent {
private String id;
private final boolean waitForCompletion;

public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);

static {
PARSER.declareString(Request::setId, DataFrameField.ID);
}

public Request(String id) {
public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.waitForCompletion = waitForCompletion;

// use the timeout value already present in BaseTasksRequest
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
}

public Request() {
this(null, false, null);
}

public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
waitForCompletion = in.readBoolean();
}

public String getId() {
Expand All @@ -69,10 +74,15 @@ public void setId(String id) {
this.id = id;
}

public boolean waitForCompletion() {
return waitForCompletion;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(waitForCompletion);
}

@Override
Expand All @@ -83,12 +93,17 @@ public ActionRequestValidationException validate() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion);
if (this.getTimeout() != null) {
builder.field(DataFrameField.TIMEOUT.getPreferredName(), this.getTimeout());
}
return builder;
}

@Override
public int hashCode() {
return Objects.hash(id);
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(id, waitForCompletion, this.getTimeout());
}

@Override
Expand All @@ -101,7 +116,13 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);

// the base class does not implement equals, therefore we need to compare timeout ourselves
if (Objects.equals(this.getTimeout(), other.getTimeout()) == false) {
return false;
}

return Objects.equals(id, other.id) && Objects.equals(waitForCompletion, other.waitForCompletion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -14,21 +16,30 @@
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.dataframe.job.DataFrameJobTask;

import java.util.List;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;

public class TransportStopDataFrameJobAction extends
TransportTasksAction<DataFrameJobTask, StopDataFrameJobAction.Request,
StopDataFrameJobAction.Response, StopDataFrameJobAction.Response> {

private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private final ThreadPool threadPool;

@Inject
public TransportStopDataFrameJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
public TransportStopDataFrameJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService,
ThreadPool threadPool) {
super(StopDataFrameJobAction.NAME, clusterService, transportService, actionFilters, StopDataFrameJobAction.Request::new,
StopDataFrameJobAction.Response::new, StopDataFrameJobAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
}

@Override
Expand All @@ -41,7 +52,42 @@ protected void doExecute(Task task, StopDataFrameJobAction.Request request,
protected void taskOperation(StopDataFrameJobAction.Request request, DataFrameJobTask jobTask,
ActionListener<StopDataFrameJobAction.Response> listener) {
if (jobTask.getConfig().getId().equals(request.getId())) {
jobTask.stop(listener);
if (request.waitForCompletion() == false) {
jobTask.stop(listener);
} else {
ActionListener<StopDataFrameJobAction.Response> blockingListener = ActionListener.wrap(response -> {
if (response.isStopped()) {
// The Task acknowledged that it is stopped/stopping... wait until the status actually
// changes over before returning. Switch over to Generic threadpool so
// we don't block the network thread
threadPool.generic().execute(() -> {
try {
long untilInNanos = System.nanoTime() + request.getTimeout().getNanos();

while (System.nanoTime() - untilInNanos < 0) {
if (jobTask.isStopped()) {
listener.onResponse(response);
return;
}
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
}
// ran out of time
listener.onFailure(new ElasticsearchTimeoutException(
DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT,
request.getTimeout().getStringRep(), request.getId())));
} catch (InterruptedException e) {
listener.onFailure(new ElasticsearchException(DataFrameMessages
.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e));
}
});
} else {
// Did not acknowledge stop, just return the response
listener.onResponse(response);
}
}, listener::onFailure);

jobTask.stop(blockingListener);
}
} else {
listener.onFailure(new RuntimeException("ID of data frame indexer task [" + jobTask.getConfig().getId()
+ "] does not match request's ID [" + request.getId() + "]"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public long getGeneration() {
return generation.get();
}

public boolean isStopped() {
return indexer.getState().equals(IndexerState.STOPPED);
}

public synchronized void start(ActionListener<Response> listener) {
final IndexerState prevState = indexer.getState();
if (prevState != IndexerState.STOPPED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
Expand All @@ -26,7 +27,10 @@ public RestStopDataFrameJobAction(Settings settings, RestController controller)
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String id = restRequest.param(DataFrameField.ID.getPreferredName());
StopDataFrameJobAction.Request request = new StopDataFrameJobAction.Request(id);
TimeValue timeout = restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), StopDataFrameJobAction.DEFAULT_TIMEOUT);
boolean waitForCompletion = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), false);

StopDataFrameJobAction.Request request = new StopDataFrameJobAction.Request(id, waitForCompletion, timeout);

return channel -> client.execute(StopDataFrameJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,31 @@
package org.elasticsearch.xpack.dataframe.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.dataframe.action.StopDataFrameJobAction.Request;

public class StopDataFrameJobActionRequestTests extends AbstractWireSerializingTestCase<Request> {

@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 10));
TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), timeout);
}

@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}

public void testSameButDifferentTimeout() {
String id = randomAlphaOfLengthBetween(1, 10);
boolean waitForCompletion = randomBoolean();

Request r1 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(10));
Request r2 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(20));

assertNotEquals(r1,r2);
assertNotEquals(r1.hashCode(),r2.hashCode());
}
}