Skip to content

Commit b487990

Browse files
authored
Make MlDailyMaintenanceService delete jobs that are in deleting state anyway (#60121)
1 parent bbfba16 commit b487990

File tree

8 files changed

+455
-58
lines changed

8 files changed

+455
-58
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
2525

2626
public static final DeleteDataFrameAnalyticsAction INSTANCE = new DeleteDataFrameAnalyticsAction();
2727
public static final String NAME = "cluster:admin/xpack/ml/data_frame/analytics/delete";
28+
public static final String DELETION_TASK_DESCRIPTION_PREFIX = "delete-analytics-";
2829

2930
private DeleteDataFrameAnalyticsAction() {
3031
super(NAME, AcknowledgedResponse::new);
@@ -77,6 +78,11 @@ public ActionRequestValidationException validate() {
7778
return null;
7879
}
7980

81+
@Override
82+
public String getDescription() {
83+
return DELETION_TASK_DESCRIPTION_PREFIX + id;
84+
}
85+
8086
@Override
8187
public boolean equals(Object o) {
8288
if (this == o) return true;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,17 @@
1111
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14-
import org.elasticsearch.tasks.Task;
15-
import org.elasticsearch.tasks.TaskId;
1614
import org.elasticsearch.xpack.core.ml.job.config.Job;
17-
import org.elasticsearch.xpack.core.ml.job.persistence.JobDeletionTask;
1815
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
1916

2017
import java.io.IOException;
21-
import java.util.Map;
2218
import java.util.Objects;
2319

2420
public class DeleteJobAction extends ActionType<AcknowledgedResponse> {
2521

2622
public static final DeleteJobAction INSTANCE = new DeleteJobAction();
2723
public static final String NAME = "cluster:admin/xpack/ml/job/delete";
24+
public static final String DELETION_TASK_DESCRIPTION_PREFIX = "delete-job-";
2825

2926
private DeleteJobAction() {
3027
super(NAME, AcknowledgedResponse::new);
@@ -84,8 +81,8 @@ public ActionRequestValidationException validate() {
8481
}
8582

8683
@Override
87-
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
88-
return new JobDeletionTask(id, type, action, "delete-job-" + jobId, parentTaskId, headers);
84+
public String getDescription() {
85+
return DELETION_TASK_DESCRIPTION_PREFIX + jobId;
8986
}
9087

9188
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/DataDescription.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,19 +318,22 @@ public static class Builder {
318318
private Character fieldDelimiter;
319319
private Character quoteCharacter;
320320

321-
public void setFormat(DataFormat format) {
321+
public Builder setFormat(DataFormat format) {
322322
dataFormat = ExceptionsHelper.requireNonNull(format, FORMAT_FIELD.getPreferredName() + " must not be null");
323+
return this;
323324
}
324325

325-
private void setFormat(String format) {
326+
private Builder setFormat(String format) {
326327
setFormat(DataFormat.forString(format));
328+
return this;
327329
}
328330

329-
public void setTimeField(String fieldName) {
331+
public Builder setTimeField(String fieldName) {
330332
timeFieldName = ExceptionsHelper.requireNonNull(fieldName, TIME_FIELD_NAME_FIELD.getPreferredName() + " must not be null");
333+
return this;
331334
}
332335

333-
public void setTimeFormat(String format) {
336+
public Builder setTimeFormat(String format) {
334337
ExceptionsHelper.requireNonNull(format, TIME_FORMAT_FIELD.getPreferredName() + " must not be null");
335338
switch (format) {
336339
case EPOCH:
@@ -345,14 +348,17 @@ public void setTimeFormat(String format) {
345348
}
346349
}
347350
timeFormat = format;
351+
return this;
348352
}
349353

350-
public void setFieldDelimiter(Character delimiter) {
354+
public Builder setFieldDelimiter(Character delimiter) {
351355
fieldDelimiter = delimiter;
356+
return this;
352357
}
353358

354-
public void setQuoteCharacter(Character value) {
359+
public Builder setQuoteCharacter(Character value) {
355360
quoteCharacter = value;
361+
return this;
356362
}
357363

358364
public DataDescription build() {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Collections;
2727

2828
import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation;
29+
import static org.hamcrest.Matchers.containsString;
2930

3031
public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
3132

@@ -78,6 +79,13 @@ public void testDeleteJobDeletesAnnotations() throws Exception {
7879
assertThatNumberOfAnnotationsIsEqualTo(2);
7980
}
8081

82+
public void testDeletingMultipleJobsInOneRequestIsImpossible() {
83+
String jobIdA = "delete-multiple-jobs-a";
84+
String jobIdB = "delete-multiple-jobs-b";
85+
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> deleteJob(jobIdA + "," + jobIdB));
86+
assertThat(e.getMessage(), containsString("Invalid job_id"));
87+
}
88+
8189
private void runJob(String jobId, String datafeedId) throws Exception {
8290
Detector.Builder detector = new Detector.Builder().setFunction("count");
8391
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.integration;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.cluster.ClusterName;
11+
import org.elasticsearch.cluster.service.ClusterService;
12+
import org.elasticsearch.common.unit.TimeValue;
13+
import org.elasticsearch.common.util.concurrent.EsExecutors;
14+
import org.elasticsearch.threadpool.ThreadPool;
15+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
16+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
17+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
18+
import org.elasticsearch.xpack.core.ml.job.config.Job;
19+
import org.elasticsearch.xpack.ml.MlAssignmentNotifier;
20+
import org.elasticsearch.xpack.ml.MlDailyMaintenanceService;
21+
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
22+
import org.junit.Before;
23+
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Set;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.function.Consumer;
31+
32+
import static java.util.stream.Collectors.toSet;
33+
import static org.hamcrest.Matchers.containsInAnyOrder;
34+
import static org.hamcrest.Matchers.is;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.when;
37+
38+
public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase {
39+
40+
private JobConfigProvider jobConfigProvider;
41+
private ThreadPool threadPool;
42+
43+
@Before
44+
public void setUpMocks() {
45+
jobConfigProvider = new JobConfigProvider(client(), xContentRegistry());
46+
threadPool = mock(ThreadPool.class);
47+
ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
48+
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(directExecutorService);
49+
}
50+
51+
public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws InterruptedException {
52+
MlDailyMaintenanceService maintenanceService =
53+
new MlDailyMaintenanceService(
54+
settings(Version.CURRENT).build(),
55+
ClusterName.DEFAULT,
56+
threadPool,
57+
client(),
58+
mock(ClusterService.class),
59+
mock(MlAssignmentNotifier.class));
60+
61+
putJob("maintenance-test-1");
62+
putJob("maintenance-test-2");
63+
putJob("maintenance-test-3");
64+
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
65+
66+
blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask);
67+
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
68+
69+
this.<Boolean>blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-2", listener));
70+
this.<Boolean>blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-3", listener));
71+
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
72+
assertThat(getJob("maintenance-test-1").get(0).isDeleting(), is(false));
73+
assertThat(getJob("maintenance-test-2").get(0).isDeleting(), is(true));
74+
assertThat(getJob("maintenance-test-3").get(0).isDeleting(), is(true));
75+
76+
blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask);
77+
assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1"));
78+
}
79+
80+
private <T> void blockingCall(Consumer<ActionListener<T>> function) throws InterruptedException {
81+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
82+
CountDownLatch latch = new CountDownLatch(1);
83+
ActionListener<T> listener = ActionListener.wrap(
84+
r -> {
85+
latch.countDown();
86+
},
87+
e -> {
88+
exceptionHolder.set(e);
89+
latch.countDown();
90+
}
91+
);
92+
function.accept(listener);
93+
latch.await();
94+
if (exceptionHolder.get() != null) {
95+
fail(exceptionHolder.get().getMessage());
96+
}
97+
}
98+
99+
private void putJob(String jobId) {
100+
Job.Builder job =
101+
new Job.Builder(jobId)
102+
.setAnalysisConfig(
103+
new AnalysisConfig.Builder((List<Detector>) null)
104+
.setBucketSpan(TimeValue.timeValueHours(1))
105+
.setDetectors(
106+
Collections.singletonList(
107+
new Detector.Builder("count", null)
108+
.setPartitionFieldName("user")
109+
.build())))
110+
.setDataDescription(
111+
new DataDescription.Builder()
112+
.setTimeFormat("epoch"));
113+
114+
registerJob(job);
115+
putJob(job);
116+
}
117+
118+
private Set<String> getJobIds() {
119+
return getJob("*").stream().map(Job::getId).collect(toSet());
120+
}
121+
}

0 commit comments

Comments
 (0)