Skip to content

Commit f218d19

Browse files
[ML] Allow asynchronous job deletion
This changes the delete job API by adding the choice to delete a job asynchronously. The commit adds a `wait_for_completion` parameter to the delete job request. When set to `false`, the action returns immediately and the response contains the task id. This also changes the handling of subsequent delete requests for a job that is already being deleted. It now uses the task framework to check if the job is being deleted instead of the cluster state. This is a beneficial for it is going to also be working once the job configs are moved out of the cluster state and into an index. Also, force delete requests that are waiting for the job to be deleted will not proceed with the deletion if the first task fails. This will prevent overloading the cluster. Instead, the failure is communicated better via notifications so that the user may retry. Finally, this makes the `deleting` property of the job visible (also it was renamed from `deleted`). This allows a client to render a deleting job differently. Closes #32836
1 parent 609ccaa commit f218d19

File tree

30 files changed

+615
-297
lines changed

30 files changed

+615
-297
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,12 @@ static Request deleteJob(DeleteJobRequest deleteJobRequest) {
146146
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
147147

148148
RequestConverters.Params params = new RequestConverters.Params(request);
149-
params.putParam("force", Boolean.toString(deleteJobRequest.isForce()));
149+
if (deleteJobRequest.getForce() != null) {
150+
params.putParam("force", Boolean.toString(deleteJobRequest.getForce()));
151+
}
152+
if (deleteJobRequest.getWaitForCompletion() != null) {
153+
params.putParam("wait_for_completion", Boolean.toString(deleteJobRequest.getWaitForCompletion()));
154+
}
150155

151156
return request;
152157
}

client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
2727
import org.elasticsearch.client.ml.DeleteForecastRequest;
2828
import org.elasticsearch.client.ml.DeleteJobRequest;
29+
import org.elasticsearch.client.ml.DeleteJobResponse;
2930
import org.elasticsearch.client.ml.FlushJobRequest;
3031
import org.elasticsearch.client.ml.FlushJobResponse;
3132
import org.elasticsearch.client.ml.ForecastJobRequest;
@@ -207,14 +208,15 @@ public void getJobStatsAsync(GetJobStatsRequest request, RequestOptions options,
207208
*
208209
* @param request The request to delete the job
209210
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
210-
* @return action acknowledgement
211+
* @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for
212+
* completion
211213
* @throws IOException when there is a serialization issue sending the request or receiving the response
212214
*/
213-
public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
215+
public DeleteJobResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
214216
return restHighLevelClient.performRequestAndParseEntity(request,
215217
MLRequestConverters::deleteJob,
216218
options,
217-
AcknowledgedResponse::fromXContent,
219+
DeleteJobResponse::fromXContent,
218220
Collections.emptySet());
219221
}
220222

@@ -228,11 +230,11 @@ public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions o
228230
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
229231
* @param listener Listener to be notified upon request completion
230232
*/
231-
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
233+
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<DeleteJobResponse> listener) {
232234
restHighLevelClient.performRequestAsyncAndParseEntity(request,
233235
MLRequestConverters::deleteJob,
234236
options,
235-
AcknowledgedResponse::fromXContent,
237+
DeleteJobResponse::fromXContent,
236238
listener,
237239
Collections.emptySet());
238240
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
public class DeleteJobRequest extends ActionRequest {
3030

3131
private String jobId;
32-
private boolean force;
32+
private Boolean force;
33+
private Boolean waitForCompletion;
3334

3435
public DeleteJobRequest(String jobId) {
3536
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
@@ -47,7 +48,7 @@ public void setJobId(String jobId) {
4748
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
4849
}
4950

50-
public boolean isForce() {
51+
public Boolean getForce() {
5152
return force;
5253
}
5354

@@ -57,10 +58,24 @@ public boolean isForce() {
5758
*
5859
* @param force When {@code true} forcefully delete an opened job. Defaults to {@code false}
5960
*/
60-
public void setForce(boolean force) {
61+
public void setForce(Boolean force) {
6162
this.force = force;
6263
}
6364

65+
public Boolean getWaitForCompletion() {
66+
return waitForCompletion;
67+
}
68+
69+
/**
70+
* Set whether this request should wait until the operation has completed before returning
71+
* @param waitForCompletion When {@code true} the call will wait for the job deletion to complete.
72+
* Otherwise, the deletion will be executed asynchronously and the response
73+
* will contain the task id.
74+
*/
75+
public void setWaitForCompletion(Boolean waitForCompletion) {
76+
this.waitForCompletion = waitForCompletion;
77+
}
78+
6479
@Override
6580
public ActionRequestValidationException validate() {
6681
return null;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionResponse;
22+
import org.elasticsearch.common.Nullable;
23+
import org.elasticsearch.common.ParseField;
24+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
25+
import org.elasticsearch.common.xcontent.ObjectParser;
26+
import org.elasticsearch.common.xcontent.ToXContentObject;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.common.xcontent.XContentParser;
29+
import org.elasticsearch.tasks.TaskId;
30+
31+
import java.io.IOException;
32+
import java.util.Objects;
33+
34+
/**
35+
* Response object that contains the acknowledgement or the task id
36+
* depending on whether the delete job action was requested to wait for completion.
37+
*/
38+
public class DeleteJobResponse extends ActionResponse implements ToXContentObject {
39+
40+
private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
41+
private static final ParseField TASK = new ParseField("task");
42+
43+
public static final ConstructingObjectParser<DeleteJobResponse, Void> PARSER = new ConstructingObjectParser<>("delete_job_response",
44+
true, a-> new DeleteJobResponse((Boolean) a[0], (TaskId) a[1]));
45+
46+
static {
47+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED);
48+
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING);
49+
}
50+
51+
public static DeleteJobResponse fromXContent(XContentParser parser) throws IOException {
52+
return PARSER.parse(parser, null);
53+
}
54+
55+
private final Boolean acknowledged;
56+
private final TaskId task;
57+
58+
DeleteJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) {
59+
assert acknowledged != null || task != null;
60+
this.acknowledged = acknowledged;
61+
this.task = task;
62+
}
63+
64+
/**
65+
* Get the action acknowledgement
66+
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code false} or
67+
* otherwise a {@code boolean} that indicates whether the job was deleted successfully.
68+
*/
69+
public Boolean getAcknowledged() {
70+
return acknowledged;
71+
}
72+
73+
/**
74+
* Get the task id
75+
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code true} or
76+
* otherwise the id of the job deletion task.
77+
*/
78+
public TaskId getTask() {
79+
return task;
80+
}
81+
82+
@Override
83+
public int hashCode() {
84+
return Objects.hash(acknowledged, task);
85+
}
86+
87+
@Override
88+
public boolean equals(Object other) {
89+
if (this == other) {
90+
return true;
91+
}
92+
93+
if (other == null || getClass() != other.getClass()) {
94+
return false;
95+
}
96+
97+
DeleteJobResponse that = (DeleteJobResponse) other;
98+
return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task);
99+
}
100+
101+
@Override
102+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
103+
builder.startObject();
104+
if (acknowledged != null) {
105+
builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged);
106+
}
107+
if (task != null) {
108+
builder.field(TASK.getPreferredName(), task.toString());
109+
}
110+
builder.endObject();
111+
return builder;
112+
}
113+
}

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class Job implements ToXContentObject {
6666
public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days");
6767
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
6868
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
69+
public static final ParseField DELETING = new ParseField("deleting");
6970

7071
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);
7172

@@ -99,6 +100,7 @@ public class Job implements ToXContentObject {
99100
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
100101
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
101102
PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
103+
PARSER.declareBoolean(Builder::setDeleting, DELETING);
102104
}
103105

104106
private final String jobId;
@@ -121,13 +123,14 @@ public class Job implements ToXContentObject {
121123
private final Map<String, Object> customSettings;
122124
private final String modelSnapshotId;
123125
private final String resultsIndexName;
126+
private final Boolean deleting;
124127

125128
private Job(String jobId, String jobType, List<String> groups, String description, Date createTime,
126129
Date finishedTime, Date lastDataTime, Long establishedModelMemory,
127130
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
128131
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
129132
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
130-
String modelSnapshotId, String resultsIndexName) {
133+
String modelSnapshotId, String resultsIndexName, Boolean deleting) {
131134

132135
this.jobId = jobId;
133136
this.jobType = jobType;
@@ -148,6 +151,7 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
148151
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
149152
this.modelSnapshotId = modelSnapshotId;
150153
this.resultsIndexName = resultsIndexName;
154+
this.deleting = deleting;
151155
}
152156

153157
/**
@@ -292,6 +296,10 @@ public String getModelSnapshotId() {
292296
return modelSnapshotId;
293297
}
294298

299+
public Boolean getDeleting() {
300+
return deleting;
301+
}
302+
295303
@Override
296304
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
297305
builder.startObject();
@@ -351,6 +359,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
351359
if (resultsIndexName != null) {
352360
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
353361
}
362+
if (deleting != null) {
363+
builder.field(DELETING.getPreferredName(), deleting);
364+
}
354365
builder.endObject();
355366
return builder;
356367
}
@@ -384,15 +395,16 @@ public boolean equals(Object other) {
384395
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
385396
&& Objects.equals(this.customSettings, that.customSettings)
386397
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
387-
&& Objects.equals(this.resultsIndexName, that.resultsIndexName);
398+
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
399+
&& Objects.equals(this.deleting, that.deleting);
388400
}
389401

390402
@Override
391403
public int hashCode() {
392404
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
393405
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
394406
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
395-
modelSnapshotId, resultsIndexName);
407+
modelSnapshotId, resultsIndexName, deleting);
396408
}
397409

398410
@Override
@@ -425,6 +437,7 @@ public static class Builder {
425437
private Map<String, Object> customSettings;
426438
private String modelSnapshotId;
427439
private String resultsIndexName;
440+
private Boolean deleting;
428441

429442
private Builder() {
430443
}
@@ -453,6 +466,7 @@ public Builder(Job job) {
453466
this.customSettings = job.getCustomSettings();
454467
this.modelSnapshotId = job.getModelSnapshotId();
455468
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
469+
this.deleting = job.getDeleting();
456470
}
457471

458472
public Builder setId(String id) {
@@ -559,6 +573,11 @@ public Builder setResultsIndexName(String resultsIndexName) {
559573
return this;
560574
}
561575

576+
public Builder setDeleting(Boolean deleting) {
577+
this.deleting = deleting;
578+
return this;
579+
}
580+
562581
/**
563582
* Builds a job.
564583
*
@@ -571,7 +590,7 @@ public Job build() {
571590
id, jobType, groups, description, createTime, finishedTime, lastDataTime, establishedModelMemory,
572591
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
573592
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
574-
modelSnapshotId, resultsIndexName);
593+
modelSnapshotId, resultsIndexName, deleting);
575594
}
576595
}
577596
}

client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
3434
import org.elasticsearch.client.ml.DeleteForecastRequest;
3535
import org.elasticsearch.client.ml.DeleteJobRequest;
36+
import org.elasticsearch.client.ml.DeleteJobResponse;
3637
import org.elasticsearch.client.ml.FlushJobRequest;
3738
import org.elasticsearch.client.ml.FlushJobResponse;
3839
import org.elasticsearch.client.ml.ForecastJobRequest;
@@ -142,17 +143,33 @@ public void testGetJob() throws Exception {
142143
assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2));
143144
}
144145

145-
public void testDeleteJob() throws Exception {
146+
public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception {
146147
String jobId = randomValidJobId();
147148
Job job = buildJob(jobId);
148149
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
149150
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
150151

151-
AcknowledgedResponse response = execute(new DeleteJobRequest(jobId),
152+
DeleteJobResponse response = execute(new DeleteJobRequest(jobId),
152153
machineLearningClient::deleteJob,
153154
machineLearningClient::deleteJobAsync);
154155

155-
assertTrue(response.isAcknowledged());
156+
assertTrue(response.getAcknowledged());
157+
assertNull(response.getTask());
158+
}
159+
160+
public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception {
161+
String jobId = randomValidJobId();
162+
Job job = buildJob(jobId);
163+
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
164+
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
165+
166+
DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId);
167+
deleteJobRequest.setWaitForCompletion(false);
168+
169+
DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync);
170+
171+
assertNull(response.getAcknowledged());
172+
assertNotNull(response.getTask());
156173
}
157174

158175
public void testOpenJob() throws Exception {

0 commit comments

Comments
 (0)