Skip to content

Commit 972c063

Browse files
author
Hendrik Muhs
authored
[ML-DataFrame] add a wait_for_completion option to the stop data frame api (#36701)
add a wait_for_completion option to the stop data frame api
1 parent 7bbbdee commit 972c063

File tree

9 files changed

+172
-32
lines changed

9 files changed

+172
-32
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public final class DataFrameField {
1717
public static final ParseField ID = new ParseField("id");
1818
public static final ParseField JOBS = new ParseField("jobs");
1919
public static final ParseField COUNT = new ParseField("count");
20+
public static final ParseField TIMEOUT = new ParseField("timeout");
21+
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
2022

2123
// common strings
2224
public static final String TASK_NAME = "data_frame/jobs";
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.dataframe;
8+
9+
import java.text.MessageFormat;
10+
import java.util.Locale;
11+
12+
public class DataFrameMessages {
13+
14+
public static final String REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT =
15+
"Timed out after [{0}] while waiting for data frame job [{1}] to stop";
16+
public static final String REST_STOP_JOB_WAIT_FOR_COMPLETION_INTERRUPT = "Interrupted while waiting for data frame job [{0}] to stop";
17+
18+
private DataFrameMessages() {
19+
}
20+
21+
/**
22+
* Returns the message parameter
23+
*
24+
* @param message Should be one of the statics defined in this class
25+
*/
26+
public static String getMessage(String message) {
27+
return message;
28+
}
29+
30+
/**
31+
* Format the message with the supplied arguments
32+
*
33+
* @param message Should be one of the statics defined in this class
34+
* @param args MessageFormat arguments. See {@linkplain MessageFormat#format(Object)}]
35+
*/
36+
public static String getMessage(String message, Object... args) {
37+
return new MessageFormat(message, Locale.ROOT).format(args);
38+
}
39+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.dataframe;
8+
9+
import org.elasticsearch.test.ESTestCase;
10+
11+
public class DataFrameMessagesTests extends ESTestCase {
12+
13+
public void testGetMessage_WithFormatStrings() {
14+
String formattedMessage = DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT, "30s",
15+
"my_job");
16+
assertEquals("Timed out after [30s] while waiting for data frame job [my_job] to stop", formattedMessage);
17+
}
18+
}

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataframePivotRestIT.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import org.apache.http.entity.ContentType;
1010
import org.apache.http.entity.StringEntity;
11-
import org.apache.http.util.EntityUtils;
1211
import org.elasticsearch.client.Request;
1312
import org.elasticsearch.client.Response;
1413
import org.elasticsearch.client.ResponseException;
@@ -20,7 +19,6 @@
2019
import org.junit.AfterClass;
2120
import org.junit.Before;
2221
import java.io.IOException;
23-
import java.util.Arrays;
2422
import java.util.List;
2523
import java.util.Map;
2624
import java.util.concurrent.TimeUnit;
@@ -175,13 +173,20 @@ private void waitForDataFrameGeneration(String jobId) throws Exception {
175173
}, 30, TimeUnit.SECONDS);
176174
}
177175

178-
private int getDataFrameGeneration(String jobId) throws IOException {
176+
private static int getDataFrameGeneration(String jobId) throws IOException {
179177
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + jobId + "/_stats"));
180178

181179
Map<?, ?> jobStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("jobs")).get(0);
182180
return (int) XContentMapValues.extractValue("state.generation", jobStatsAsMap);
183181
}
184182

183+
private static String getDataFrameIndexerState(String jobId) throws IOException {
184+
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + jobId + "/_stats"));
185+
186+
Map<?, ?> jobStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("jobs")).get(0);
187+
return (String) XContentMapValues.extractValue("state.job_state", jobStatsAsMap);
188+
}
189+
185190
private void refreshIndex(String index) throws IOException {
186191
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
187192
}
@@ -208,25 +213,13 @@ private static void wipeDataFrameJobs() throws IOException, InterruptedException
208213
for (Map<String, Object> jobConfig : jobConfigs) {
209214
String jobId = (String) jobConfig.get("id");
210215
Request request = new Request("POST", DATAFRAME_ENDPOINT + jobId + "/_stop");
216+
request.addParameter("wait_for_completion", "true");
217+
request.addParameter("timeout", "10s");
211218
request.addParameter("ignore", "404");
212219
adminClient().performRequest(request);
220+
assertEquals("stopped", getDataFrameIndexerState(jobId));
213221
}
214222

215-
// TODO this is temporary until the StopDataFrameJob API gains the ability to block until stopped
216-
boolean stopped = awaitBusy(() -> {
217-
Request request = new Request("GET", DATAFRAME_ENDPOINT + "_all");
218-
try {
219-
Response jobsResponse = adminClient().performRequest(request);
220-
String body = EntityUtils.toString(jobsResponse.getEntity());
221-
// If the body contains any of the non-stopped states, at least one job is not finished yet
222-
return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
223-
} catch (IOException e) {
224-
return false;
225-
}
226-
}, 10, TimeUnit.SECONDS);
227-
228-
assertTrue("Timed out waiting for data frame job(s) to stop", stopped);
229-
230223
for (Map<String, Object> jobConfig : jobConfigs) {
231224
String jobId = (String) jobConfig.get("id");
232225
Request request = new Request("DELETE", DATAFRAME_ENDPOINT + jobId);

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobAction.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@
1111
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
1212
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
1313
import org.elasticsearch.client.ElasticsearchClient;
14+
import org.elasticsearch.common.Nullable;
1415
import org.elasticsearch.common.io.stream.StreamInput;
1516
import org.elasticsearch.common.io.stream.StreamOutput;
1617
import org.elasticsearch.common.io.stream.Writeable;
17-
import org.elasticsearch.common.xcontent.ObjectParser;
18+
import org.elasticsearch.common.unit.TimeValue;
1819
import org.elasticsearch.common.xcontent.ToXContent;
1920
import org.elasticsearch.common.xcontent.ToXContentObject;
2021
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -25,12 +26,15 @@
2526
import java.io.IOException;
2627
import java.util.Collections;
2728
import java.util.Objects;
29+
import java.util.concurrent.TimeUnit;
2830

2931
public class StopDataFrameJobAction extends Action<StopDataFrameJobAction.Response> {
3032

3133
public static final StopDataFrameJobAction INSTANCE = new StopDataFrameJobAction();
3234
public static final String NAME = "cluster:admin/data_frame/stop";
3335

36+
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
37+
3438
private StopDataFrameJobAction() {
3539
super(NAME);
3640
}
@@ -42,23 +46,24 @@ public Response newResponse() {
4246

4347
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
4448
private String id;
49+
private final boolean waitForCompletion;
4550

46-
public static ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
47-
48-
static {
49-
PARSER.declareString(Request::setId, DataFrameField.ID);
50-
}
51-
52-
public Request(String id) {
51+
public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) {
5352
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
53+
this.waitForCompletion = waitForCompletion;
54+
55+
// use the timeout value already present in BaseTasksRequest
56+
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
5457
}
5558

5659
public Request() {
60+
this(null, false, null);
5761
}
5862

5963
public Request(StreamInput in) throws IOException {
6064
super(in);
6165
id = in.readString();
66+
waitForCompletion = in.readBoolean();
6267
}
6368

6469
public String getId() {
@@ -69,10 +74,15 @@ public void setId(String id) {
6974
this.id = id;
7075
}
7176

77+
public boolean waitForCompletion() {
78+
return waitForCompletion;
79+
}
80+
7281
@Override
7382
public void writeTo(StreamOutput out) throws IOException {
7483
super.writeTo(out);
7584
out.writeString(id);
85+
out.writeBoolean(waitForCompletion);
7686
}
7787

7888
@Override
@@ -83,12 +93,17 @@ public ActionRequestValidationException validate() {
8393
@Override
8494
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8595
builder.field(DataFrameField.ID.getPreferredName(), id);
96+
builder.field(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion);
97+
if (this.getTimeout() != null) {
98+
builder.field(DataFrameField.TIMEOUT.getPreferredName(), this.getTimeout());
99+
}
86100
return builder;
87101
}
88102

89103
@Override
90104
public int hashCode() {
91-
return Objects.hash(id);
105+
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
106+
return Objects.hash(id, waitForCompletion, this.getTimeout());
92107
}
93108

94109
@Override
@@ -101,7 +116,13 @@ public boolean equals(Object obj) {
101116
return false;
102117
}
103118
Request other = (Request) obj;
104-
return Objects.equals(id, other.id);
119+
120+
// the base class does not implement equals, therefore we need to compare timeout ourselves
121+
if (Objects.equals(this.getTimeout(), other.getTimeout()) == false) {
122+
return false;
123+
}
124+
125+
return Objects.equals(id, other.id) && Objects.equals(waitForCompletion, other.waitForCompletion);
105126
}
106127

107128
@Override

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameJobAction.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package org.elasticsearch.xpack.dataframe.action;
77

8+
import org.elasticsearch.ElasticsearchException;
9+
import org.elasticsearch.ElasticsearchTimeoutException;
810
import org.elasticsearch.ExceptionsHelper;
911
import org.elasticsearch.ResourceNotFoundException;
1012
import org.elasticsearch.action.ActionListener;
@@ -14,21 +16,30 @@
1416
import org.elasticsearch.action.support.tasks.TransportTasksAction;
1517
import org.elasticsearch.cluster.service.ClusterService;
1618
import org.elasticsearch.common.inject.Inject;
19+
import org.elasticsearch.common.unit.TimeValue;
1720
import org.elasticsearch.tasks.Task;
1821
import org.elasticsearch.threadpool.ThreadPool;
1922
import org.elasticsearch.transport.TransportService;
23+
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
2024
import org.elasticsearch.xpack.dataframe.job.DataFrameJobTask;
2125

2226
import java.util.List;
2327

28+
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
29+
2430
public class TransportStopDataFrameJobAction extends
2531
TransportTasksAction<DataFrameJobTask, StopDataFrameJobAction.Request,
2632
StopDataFrameJobAction.Response, StopDataFrameJobAction.Response> {
2733

34+
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
35+
private final ThreadPool threadPool;
36+
2837
@Inject
29-
public TransportStopDataFrameJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
38+
public TransportStopDataFrameJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService,
39+
ThreadPool threadPool) {
3040
super(StopDataFrameJobAction.NAME, clusterService, transportService, actionFilters, StopDataFrameJobAction.Request::new,
3141
StopDataFrameJobAction.Response::new, StopDataFrameJobAction.Response::new, ThreadPool.Names.SAME);
42+
this.threadPool = threadPool;
3243
}
3344

3445
@Override
@@ -41,7 +52,42 @@ protected void doExecute(Task task, StopDataFrameJobAction.Request request,
4152
protected void taskOperation(StopDataFrameJobAction.Request request, DataFrameJobTask jobTask,
4253
ActionListener<StopDataFrameJobAction.Response> listener) {
4354
if (jobTask.getConfig().getId().equals(request.getId())) {
44-
jobTask.stop(listener);
55+
if (request.waitForCompletion() == false) {
56+
jobTask.stop(listener);
57+
} else {
58+
ActionListener<StopDataFrameJobAction.Response> blockingListener = ActionListener.wrap(response -> {
59+
if (response.isStopped()) {
60+
// The Task acknowledged that it is stopped/stopping... wait until the status actually
61+
// changes over before returning. Switch over to Generic threadpool so
62+
// we don't block the network thread
63+
threadPool.generic().execute(() -> {
64+
try {
65+
long untilInNanos = System.nanoTime() + request.getTimeout().getNanos();
66+
67+
while (System.nanoTime() - untilInNanos < 0) {
68+
if (jobTask.isStopped()) {
69+
listener.onResponse(response);
70+
return;
71+
}
72+
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
73+
}
74+
// ran out of time
75+
listener.onFailure(new ElasticsearchTimeoutException(
76+
DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_TIMEOUT,
77+
request.getTimeout().getStringRep(), request.getId())));
78+
} catch (InterruptedException e) {
79+
listener.onFailure(new ElasticsearchException(DataFrameMessages
80+
.getMessage(DataFrameMessages.REST_STOP_JOB_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e));
81+
}
82+
});
83+
} else {
84+
// Did not acknowledge stop, just return the response
85+
listener.onResponse(response);
86+
}
87+
}, listener::onFailure);
88+
89+
jobTask.stop(blockingListener);
90+
}
4591
} else {
4692
listener.onFailure(new RuntimeException("ID of data frame indexer task [" + jobTask.getConfig().getId()
4793
+ "] does not match request's ID [" + request.getId() + "]"));

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/job/DataFrameJobTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ public long getGeneration() {
101101
return generation.get();
102102
}
103103

104+
public boolean isStopped() {
105+
return indexer.getState().equals(IndexerState.STOPPED);
106+
}
107+
104108
public synchronized void start(ActionListener<Response> listener) {
105109
final IndexerState prevState = indexer.getState();
106110
if (prevState != IndexerState.STOPPED) {

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameJobAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.client.node.NodeClient;
99
import org.elasticsearch.common.settings.Settings;
10+
import org.elasticsearch.common.unit.TimeValue;
1011
import org.elasticsearch.rest.BaseRestHandler;
1112
import org.elasticsearch.rest.RestController;
1213
import org.elasticsearch.rest.RestRequest;
@@ -26,7 +27,10 @@ public RestStopDataFrameJobAction(Settings settings, RestController controller)
2627
@Override
2728
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
2829
String id = restRequest.param(DataFrameField.ID.getPreferredName());
29-
StopDataFrameJobAction.Request request = new StopDataFrameJobAction.Request(id);
30+
TimeValue timeout = restRequest.paramAsTime(DataFrameField.TIMEOUT.getPreferredName(), StopDataFrameJobAction.DEFAULT_TIMEOUT);
31+
boolean waitForCompletion = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), false);
32+
33+
StopDataFrameJobAction.Request request = new StopDataFrameJobAction.Request(id, waitForCompletion, timeout);
3034

3135
return channel -> client.execute(StopDataFrameJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
3236
}

x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/StopDataFrameJobActionRequestTests.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,31 @@
77
package org.elasticsearch.xpack.dataframe.action;
88

99
import org.elasticsearch.common.io.stream.Writeable;
10+
import org.elasticsearch.common.unit.TimeValue;
1011
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1112
import org.elasticsearch.xpack.dataframe.action.StopDataFrameJobAction.Request;
1213

1314
public class StopDataFrameJobActionRequestTests extends AbstractWireSerializingTestCase<Request> {
1415

1516
@Override
1617
protected Request createTestInstance() {
17-
return new Request(randomAlphaOfLengthBetween(1, 10));
18+
TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null;
19+
return new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), timeout);
1820
}
1921

2022
@Override
2123
protected Writeable.Reader<Request> instanceReader() {
2224
return Request::new;
2325
}
26+
27+
public void testSameButDifferentTimeout() {
28+
String id = randomAlphaOfLengthBetween(1, 10);
29+
boolean waitForCompletion = randomBoolean();
30+
31+
Request r1 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(10));
32+
Request r2 = new Request(id, waitForCompletion, TimeValue.timeValueSeconds(20));
33+
34+
assertNotEquals(r1,r2);
35+
assertNotEquals(r1.hashCode(),r2.hashCode());
36+
}
2437
}

0 commit comments

Comments
 (0)