Skip to content

Commit 2ff5c9d

Browse files
Synchronize search of existing tasks and mark started
1 parent f218d19 commit 2ff5c9d

File tree

5 files changed

+45
-18
lines changed

5 files changed

+45
-18
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,18 @@ public void testDeleteJob() {
162162
Request request = MLRequestConverters.deleteJob(deleteJobRequest);
163163
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
164164
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId, request.getEndpoint());
165-
assertEquals(Boolean.toString(false), request.getParameters().get("force"));
165+
assertNull(request.getParameters().get("force"));
166+
assertNull(request.getParameters().get("wait_for_completion"));
166167

168+
deleteJobRequest = new DeleteJobRequest(jobId);
167169
deleteJobRequest.setForce(true);
168170
request = MLRequestConverters.deleteJob(deleteJobRequest);
169171
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
172+
173+
deleteJobRequest = new DeleteJobRequest(jobId);
174+
deleteJobRequest.setWaitForCompletion(false);
175+
request = MLRequestConverters.deleteJob(deleteJobRequest);
176+
assertEquals(Boolean.toString(false), request.getParameters().get("wait_for_completion"));
170177
}
171178

172179
public void testFlushJob() throws Exception {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,17 @@
1212

1313
public class JobDeletionTask extends Task {
1414

15+
private volatile boolean started;
16+
1517
public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers) {
1618
super(id, type, action, description, parentTask, headers);
1719
}
20+
21+
public void start() {
22+
started = true;
23+
}
24+
25+
public boolean isStarted() {
26+
return started;
27+
}
1828
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/MlFilterTests.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
package org.elasticsearch.xpack.core.ml.job.config;
77

88
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
9-
import org.apache.http.util.EntityUtils;
109
import org.elasticsearch.ElasticsearchStatusException;
11-
import org.elasticsearch.client.Response;
1210
import org.elasticsearch.common.io.stream.Writeable.Reader;
1311
import org.elasticsearch.common.xcontent.XContentParser;
1412
import org.elasticsearch.common.xcontent.json.JsonXContent;
@@ -19,8 +17,6 @@
1917
import java.util.List;
2018
import java.util.SortedSet;
2119
import java.util.TreeSet;
22-
import java.util.regex.Matcher;
23-
import java.util.regex.Pattern;
2420

2521
import static org.hamcrest.Matchers.contains;
2622
import static org.hamcrest.Matchers.containsString;

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.xpack.ml.MachineLearning;
2222
import org.elasticsearch.xpack.test.rest.XPackRestTestHelper;
2323
import org.junit.After;
24-
import org.junit.Ignore;
2524

2625
import java.io.IOException;
2726
import java.util.Locale;
@@ -562,9 +561,8 @@ public void testMultiIndexDelete() throws Exception {
562561
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
563562
}
564563

565-
@Ignore
566564
public void testDelete_multipleRequest() throws Exception {
567-
String jobId = "delete-job-mulitple-times";
565+
String jobId = "delete-job-multiple-times";
568566
createFarequoteJob(jobId);
569567

570568
ConcurrentMapLong<Response> responses = ConcurrentCollections.newConcurrentMapLong();
@@ -575,8 +573,8 @@ public void testDelete_multipleRequest() throws Exception {
575573
AtomicReference<ResponseException> recreationException = new AtomicReference<>();
576574

577575
Runnable deleteJob = () -> {
576+
boolean forceDelete = randomBoolean();
578577
try {
579-
boolean forceDelete = randomBoolean();
580578
String url = MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId;
581579
if (forceDelete) {
582580
url += "?force=true";
@@ -597,6 +595,7 @@ public void testDelete_multipleRequest() throws Exception {
597595
} catch (ResponseException re) {
598596
recreationException.set(re);
599597
} catch (IOException e) {
598+
logger.error("Error trying to recreate the job", e);
600599
ioe.set(e);
601600
}
602601
}
@@ -606,14 +605,14 @@ public void testDelete_multipleRequest() throws Exception {
606605
// the other to complete. This is difficult to schedule but
607606
// hopefully it will happen in CI
608607
int numThreads = 5;
609-
Thread [] threads = new Thread[numThreads];
610-
for (int i=0; i<numThreads; i++) {
608+
Thread[] threads = new Thread[numThreads];
609+
for (int i = 0; i < numThreads; i++) {
611610
threads[i] = new Thread(deleteJob);
612611
}
613-
for (int i=0; i<numThreads; i++) {
612+
for (int i = 0; i < numThreads; i++) {
614613
threads[i].start();
615614
}
616-
for (int i=0; i<numThreads; i++) {
615+
for (int i = 0; i < numThreads; i++) {
617616
threads[i].join();
618617
}
619618

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
7373
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
7474
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
75+
import org.elasticsearch.xpack.core.ml.job.persistence.JobDeletionTask;
7576
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
7677
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
7778
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
@@ -143,11 +144,17 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
143144
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
144145

145146
// Check if there is a deletion task for this job already and if yes wait for it to complete
146-
Optional<Task> existingTask = taskManager.getTasks().values().stream().filter(searchedTask -> task.getDescription().equals(
147-
searchedTask.getDescription()) && task.getId() != searchedTask.getId()).findFirst();
148-
if (existingTask.isPresent()) {
149-
logger.debug("Found existing deletion task for job [{}]", request.getJobId());
150-
TaskId existingTaskId = new TaskId(clusterService.localNode().getId(), existingTask.get().getId());
147+
Optional<Task> existingStartedTask;
148+
synchronized (this) {
149+
existingStartedTask = findExistingStartedTask(task);
150+
if (existingStartedTask.isPresent() == false) {
151+
((JobDeletionTask) task).start();
152+
}
153+
}
154+
if (existingStartedTask.isPresent()) {
155+
logger.debug("[{}] Deletion task [{}] will wait for existing deletion task [{}]", request.getJobId(), task.getId(),
156+
existingStartedTask.get().getId());
157+
TaskId existingTaskId = new TaskId(clusterService.localNode().getId(), existingStartedTask.get().getId());
151158
waitForExistingTaskToComplete(parentTaskClient, request.getJobId(), existingTaskId, listener);
152159
return;
153160
}
@@ -170,6 +177,13 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
170177
markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
171178
}
172179

180+
private Optional<Task> findExistingStartedTask(Task currentTask) {
181+
return taskManager.getTasks().values().stream().filter(filteredTask ->
182+
currentTask.getDescription().equals(filteredTask.getDescription()) &&
183+
currentTask.getId() != filteredTask.getId()
184+
&& ((JobDeletionTask) filteredTask).isStarted()).findFirst();
185+
}
186+
173187
private void waitForExistingTaskToComplete(ParentTaskAssigningClient parentTaskClient, String jobId, TaskId taskId,
174188
ActionListener<AcknowledgedResponse> listener) {
175189
threadPool.generic().execute(new AbstractRunnable() {
@@ -203,6 +217,7 @@ public void onResponse(GetTaskResponse getTaskResponse) {
203217
+ taskErrorBytes.utf8ToString()));
204218
}
205219
} else {
220+
logger.debug("[{}] Finished waiting for completion of task [{}]", jobId, taskId);
206221
listener.onResponse(new AcknowledgedResponse(true));
207222
}
208223
}

0 commit comments

Comments
 (0)