Skip to content

Commit 87778c6

Browse files
[ML] Allow asynchronous job deletion (#34058)
This changes the delete job API by adding the choice to delete a job asynchronously. The commit adds a `wait_for_completion` parameter to the delete job request. When set to `false`, the action returns immediately and the response contains the task id. This also changes the handling of subsequent delete requests for a job that is already being deleted. It now uses the task framework to check if the job is being deleted instead of the cluster state. This is a beneficial for it is going to also be working once the job configs are moved out of the cluster state and into an index. Also, force delete requests that are waiting for the job to be deleted will not proceed with the deletion if the first task fails. This will prevent overloading the cluster. Instead, the failure is communicated better via notifications so that the user may retry. Finally, this makes the `deleting` property of the job visible (also it was renamed from `deleted`). This allows a client to render a deleting job differently. Closes #32836
1 parent 82b5794 commit 87778c6

File tree

31 files changed

+611
-428
lines changed

31 files changed

+611
-428
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,12 @@ static Request deleteJob(DeleteJobRequest deleteJobRequest) {
148148
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
149149

150150
RequestConverters.Params params = new RequestConverters.Params(request);
151-
params.putParam("force", Boolean.toString(deleteJobRequest.isForce()));
151+
if (deleteJobRequest.getForce() != null) {
152+
params.putParam("force", Boolean.toString(deleteJobRequest.getForce()));
153+
}
154+
if (deleteJobRequest.getWaitForCompletion() != null) {
155+
params.putParam("wait_for_completion", Boolean.toString(deleteJobRequest.getWaitForCompletion()));
156+
}
152157

153158
return request;
154159
}

client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
2727
import org.elasticsearch.client.ml.DeleteForecastRequest;
2828
import org.elasticsearch.client.ml.DeleteJobRequest;
29+
import org.elasticsearch.client.ml.DeleteJobResponse;
2930
import org.elasticsearch.client.ml.FlushJobRequest;
3031
import org.elasticsearch.client.ml.FlushJobResponse;
3132
import org.elasticsearch.client.ml.ForecastJobRequest;
@@ -211,14 +212,15 @@ public void getJobStatsAsync(GetJobStatsRequest request, RequestOptions options,
211212
*
212213
* @param request The request to delete the job
213214
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
214-
* @return action acknowledgement
215+
* @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for
216+
* completion
215217
* @throws IOException when there is a serialization issue sending the request or receiving the response
216218
*/
217-
public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
219+
public DeleteJobResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
218220
return restHighLevelClient.performRequestAndParseEntity(request,
219221
MLRequestConverters::deleteJob,
220222
options,
221-
AcknowledgedResponse::fromXContent,
223+
DeleteJobResponse::fromXContent,
222224
Collections.emptySet());
223225
}
224226

@@ -232,11 +234,11 @@ public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions o
232234
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
233235
* @param listener Listener to be notified upon request completion
234236
*/
235-
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
237+
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<DeleteJobResponse> listener) {
236238
restHighLevelClient.performRequestAsyncAndParseEntity(request,
237239
MLRequestConverters::deleteJob,
238240
options,
239-
AcknowledgedResponse::fromXContent,
241+
DeleteJobResponse::fromXContent,
240242
listener,
241243
Collections.emptySet());
242244
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
public class DeleteJobRequest extends ActionRequest {
3030

3131
private String jobId;
32-
private boolean force;
32+
private Boolean force;
33+
private Boolean waitForCompletion;
3334

3435
public DeleteJobRequest(String jobId) {
3536
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
@@ -47,7 +48,7 @@ public void setJobId(String jobId) {
4748
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
4849
}
4950

50-
public boolean isForce() {
51+
public Boolean getForce() {
5152
return force;
5253
}
5354

@@ -57,10 +58,24 @@ public boolean isForce() {
5758
*
5859
* @param force When {@code true} forcefully delete an opened job. Defaults to {@code false}
5960
*/
60-
public void setForce(boolean force) {
61+
public void setForce(Boolean force) {
6162
this.force = force;
6263
}
6364

65+
public Boolean getWaitForCompletion() {
66+
return waitForCompletion;
67+
}
68+
69+
/**
70+
* Set whether this request should wait until the operation has completed before returning
71+
* @param waitForCompletion When {@code true} the call will wait for the job deletion to complete.
72+
* Otherwise, the deletion will be executed asynchronously and the response
73+
* will contain the task id.
74+
*/
75+
public void setWaitForCompletion(Boolean waitForCompletion) {
76+
this.waitForCompletion = waitForCompletion;
77+
}
78+
6479
@Override
6580
public ActionRequestValidationException validate() {
6681
return null;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionResponse;
22+
import org.elasticsearch.common.Nullable;
23+
import org.elasticsearch.common.ParseField;
24+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
25+
import org.elasticsearch.common.xcontent.ObjectParser;
26+
import org.elasticsearch.common.xcontent.ToXContentObject;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.common.xcontent.XContentParser;
29+
import org.elasticsearch.tasks.TaskId;
30+
31+
import java.io.IOException;
32+
import java.util.Objects;
33+
34+
/**
35+
* Response object that contains the acknowledgement or the task id
36+
* depending on whether the delete job action was requested to wait for completion.
37+
*/
38+
public class DeleteJobResponse extends ActionResponse implements ToXContentObject {
39+
40+
private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
41+
private static final ParseField TASK = new ParseField("task");
42+
43+
public static final ConstructingObjectParser<DeleteJobResponse, Void> PARSER = new ConstructingObjectParser<>("delete_job_response",
44+
true, a-> new DeleteJobResponse((Boolean) a[0], (TaskId) a[1]));
45+
46+
static {
47+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED);
48+
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING);
49+
}
50+
51+
public static DeleteJobResponse fromXContent(XContentParser parser) throws IOException {
52+
return PARSER.parse(parser, null);
53+
}
54+
55+
private final Boolean acknowledged;
56+
private final TaskId task;
57+
58+
DeleteJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) {
59+
assert acknowledged != null || task != null;
60+
this.acknowledged = acknowledged;
61+
this.task = task;
62+
}
63+
64+
/**
65+
* Get the action acknowledgement
66+
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code false} or
67+
* otherwise a {@code boolean} that indicates whether the job was deleted successfully.
68+
*/
69+
public Boolean getAcknowledged() {
70+
return acknowledged;
71+
}
72+
73+
/**
74+
* Get the task id
75+
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code true} or
76+
* otherwise the id of the job deletion task.
77+
*/
78+
public TaskId getTask() {
79+
return task;
80+
}
81+
82+
@Override
83+
public int hashCode() {
84+
return Objects.hash(acknowledged, task);
85+
}
86+
87+
@Override
88+
public boolean equals(Object other) {
89+
if (this == other) {
90+
return true;
91+
}
92+
93+
if (other == null || getClass() != other.getClass()) {
94+
return false;
95+
}
96+
97+
DeleteJobResponse that = (DeleteJobResponse) other;
98+
return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task);
99+
}
100+
101+
@Override
102+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
103+
builder.startObject();
104+
if (acknowledged != null) {
105+
builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged);
106+
}
107+
if (task != null) {
108+
builder.field(TASK.getPreferredName(), task.toString());
109+
}
110+
builder.endObject();
111+
return builder;
112+
}
113+
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class Job implements ToXContentObject {
6666
public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days");
6767
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
6868
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
69+
public static final ParseField DELETING = new ParseField("deleting");
6970

7071
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);
7172

@@ -99,6 +100,7 @@ public class Job implements ToXContentObject {
99100
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
100101
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
101102
PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
103+
PARSER.declareBoolean(Builder::setDeleting, DELETING);
102104
}
103105

104106
private final String jobId;
@@ -121,13 +123,14 @@ public class Job implements ToXContentObject {
121123
private final Map<String, Object> customSettings;
122124
private final String modelSnapshotId;
123125
private final String resultsIndexName;
126+
private final Boolean deleting;
124127

125128
private Job(String jobId, String jobType, List<String> groups, String description, Date createTime,
126129
Date finishedTime, Date lastDataTime, Long establishedModelMemory,
127130
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
128131
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
129132
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
130-
String modelSnapshotId, String resultsIndexName) {
133+
String modelSnapshotId, String resultsIndexName, Boolean deleting) {
131134

132135
this.jobId = jobId;
133136
this.jobType = jobType;
@@ -148,6 +151,7 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
148151
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
149152
this.modelSnapshotId = modelSnapshotId;
150153
this.resultsIndexName = resultsIndexName;
154+
this.deleting = deleting;
151155
}
152156

153157
/**
@@ -292,6 +296,10 @@ public String getModelSnapshotId() {
292296
return modelSnapshotId;
293297
}
294298

299+
public Boolean getDeleting() {
300+
return deleting;
301+
}
302+
295303
@Override
296304
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
297305
builder.startObject();
@@ -351,6 +359,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
351359
if (resultsIndexName != null) {
352360
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
353361
}
362+
if (deleting != null) {
363+
builder.field(DELETING.getPreferredName(), deleting);
364+
}
354365
builder.endObject();
355366
return builder;
356367
}
@@ -384,15 +395,16 @@ public boolean equals(Object other) {
384395
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
385396
&& Objects.equals(this.customSettings, that.customSettings)
386397
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
387-
&& Objects.equals(this.resultsIndexName, that.resultsIndexName);
398+
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
399+
&& Objects.equals(this.deleting, that.deleting);
388400
}
389401

390402
@Override
391403
public int hashCode() {
392404
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
393405
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
394406
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
395-
modelSnapshotId, resultsIndexName);
407+
modelSnapshotId, resultsIndexName, deleting);
396408
}
397409

398410
@Override
@@ -425,6 +437,7 @@ public static class Builder {
425437
private Map<String, Object> customSettings;
426438
private String modelSnapshotId;
427439
private String resultsIndexName;
440+
private Boolean deleting;
428441

429442
private Builder() {
430443
}
@@ -453,6 +466,7 @@ public Builder(Job job) {
453466
this.customSettings = job.getCustomSettings();
454467
this.modelSnapshotId = job.getModelSnapshotId();
455468
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
469+
this.deleting = job.getDeleting();
456470
}
457471

458472
public Builder setId(String id) {
@@ -559,6 +573,11 @@ public Builder setResultsIndexName(String resultsIndexName) {
559573
return this;
560574
}
561575

576+
Builder setDeleting(Boolean deleting) {
577+
this.deleting = deleting;
578+
return this;
579+
}
580+
562581
/**
563582
* Builds a job.
564583
*
@@ -571,7 +590,7 @@ public Job build() {
571590
id, jobType, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
572591
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
573592
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
574-
modelSnapshotId, resultsIndexName);
593+
modelSnapshotId, resultsIndexName, deleting);
575594
}
576595
}
577596
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,18 @@ public void testDeleteJob() {
164164
Request request = MLRequestConverters.deleteJob(deleteJobRequest);
165165
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
166166
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId, request.getEndpoint());
167-
assertEquals(Boolean.toString(false), request.getParameters().get("force"));
167+
assertNull(request.getParameters().get("force"));
168+
assertNull(request.getParameters().get("wait_for_completion"));
168169

170+
deleteJobRequest = new DeleteJobRequest(jobId);
169171
deleteJobRequest.setForce(true);
170172
request = MLRequestConverters.deleteJob(deleteJobRequest);
171173
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
174+
175+
deleteJobRequest = new DeleteJobRequest(jobId);
176+
deleteJobRequest.setWaitForCompletion(false);
177+
request = MLRequestConverters.deleteJob(deleteJobRequest);
178+
assertEquals(Boolean.toString(false), request.getParameters().get("wait_for_completion"));
172179
}
173180

174181
public void testFlushJob() throws Exception {

client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
3434
import org.elasticsearch.client.ml.DeleteForecastRequest;
3535
import org.elasticsearch.client.ml.DeleteJobRequest;
36+
import org.elasticsearch.client.ml.DeleteJobResponse;
3637
import org.elasticsearch.client.ml.FlushJobRequest;
3738
import org.elasticsearch.client.ml.FlushJobResponse;
3839
import org.elasticsearch.client.ml.ForecastJobRequest;
@@ -151,17 +152,33 @@ public void testGetJob() throws Exception {
151152
assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2));
152153
}
153154

154-
public void testDeleteJob() throws Exception {
155+
public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception {
155156
String jobId = randomValidJobId();
156157
Job job = buildJob(jobId);
157158
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
158159
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
159160

160-
AcknowledgedResponse response = execute(new DeleteJobRequest(jobId),
161+
DeleteJobResponse response = execute(new DeleteJobRequest(jobId),
161162
machineLearningClient::deleteJob,
162163
machineLearningClient::deleteJobAsync);
163164

164-
assertTrue(response.isAcknowledged());
165+
assertTrue(response.getAcknowledged());
166+
assertNull(response.getTask());
167+
}
168+
169+
public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception {
170+
String jobId = randomValidJobId();
171+
Job job = buildJob(jobId);
172+
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
173+
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
174+
175+
DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId);
176+
deleteJobRequest.setWaitForCompletion(false);
177+
178+
DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync);
179+
180+
assertNull(response.getAcknowledged());
181+
assertNotNull(response.getTask());
165182
}
166183

167184
public void testOpenJob() throws Exception {

0 commit comments

Comments
 (0)