Skip to content

Commit 2ad8c9f

Browse files
Refactor to use listeners
1 parent 2ff5c9d commit 2ad8c9f

File tree

1 file changed

+44
-81
lines changed

1 file changed

+44
-81
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 44 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,9 @@
66
package org.elasticsearch.xpack.ml.action;
77

88
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
9-
import org.elasticsearch.ElasticsearchException;
109
import org.elasticsearch.ElasticsearchStatusException;
1110
import org.elasticsearch.ResourceNotFoundException;
1211
import org.elasticsearch.action.ActionListener;
13-
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
14-
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
15-
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
1612
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1713
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
1814
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
@@ -28,7 +24,6 @@
2824
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2925
import org.elasticsearch.client.Client;
3026
import org.elasticsearch.client.ParentTaskAssigningClient;
31-
import org.elasticsearch.client.Requests;
3227
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
3328
import org.elasticsearch.cluster.ClusterState;
3429
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -39,13 +34,9 @@
3934
import org.elasticsearch.cluster.metadata.MetaData;
4035
import org.elasticsearch.cluster.service.ClusterService;
4136
import org.elasticsearch.common.CheckedConsumer;
42-
import org.elasticsearch.common.bytes.BytesReference;
37+
import org.elasticsearch.common.Nullable;
4338
import org.elasticsearch.common.inject.Inject;
4439
import org.elasticsearch.common.settings.Settings;
45-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
46-
import org.elasticsearch.common.xcontent.DeprecationHandler;
47-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
48-
import org.elasticsearch.common.xcontent.XContentParser;
4940
import org.elasticsearch.index.IndexNotFoundException;
5041
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
5142
import org.elasticsearch.index.query.IdsQueryBuilder;
@@ -61,7 +52,6 @@
6152
import org.elasticsearch.tasks.TaskId;
6253
import org.elasticsearch.threadpool.ThreadPool;
6354
import org.elasticsearch.transport.TransportService;
64-
import org.elasticsearch.xpack.core.ml.MachineLearningField;
6555
import org.elasticsearch.xpack.core.ml.MlMetadata;
6656
import org.elasticsearch.xpack.core.ml.MlTasks;
6757
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
@@ -72,7 +62,6 @@
7262
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
7363
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
7464
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
75-
import org.elasticsearch.xpack.core.ml.job.persistence.JobDeletionTask;
7665
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
7766
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
7867
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
@@ -81,12 +70,11 @@
8170
import org.elasticsearch.xpack.ml.notifications.Auditor;
8271
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
8372

84-
import java.io.IOException;
85-
import java.io.InputStream;
8673
import java.util.ArrayList;
74+
import java.util.HashMap;
8775
import java.util.HashSet;
8876
import java.util.List;
89-
import java.util.Optional;
77+
import java.util.Map;
9078
import java.util.Set;
9179
import java.util.function.Consumer;
9280

@@ -102,6 +90,14 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
10290
private final Auditor auditor;
10391
private final JobResultsProvider jobResultsProvider;
10492

93+
/**
94+
* A map of task listeners by job_id.
95+
* Subsequent delete requests store their listeners in the corresponding list in this map
96+
* and wait to be notified when the first deletion task completes.
97+
* This is guarded by synchronizing on its lock.
98+
*/
99+
private final Map<String, List<ActionListener<AcknowledgedResponse>>> listenersByJobId;
100+
105101
@Inject
106102
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
107103
ThreadPool threadPool, ActionFilters actionFilters,
@@ -113,6 +109,7 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
113109
this.persistentTasksService = persistentTasksService;
114110
this.auditor = auditor;
115111
this.jobResultsProvider = jobResultsProvider;
112+
this.listenersByJobId = new HashMap<>();
116113
}
117114

118115
@Override
@@ -144,92 +141,58 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
144141
ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
145142

146143
// Check if there is a deletion task for this job already and if yes wait for it to complete
147-
Optional<Task> existingStartedTask;
148-
synchronized (this) {
149-
existingStartedTask = findExistingStartedTask(task);
150-
if (existingStartedTask.isPresent() == false) {
151-
((JobDeletionTask) task).start();
144+
synchronized (listenersByJobId) {
145+
if (listenersByJobId.containsKey(request.getJobId())) {
146+
logger.debug("[{}] Deletion task [{}] will wait for existing deletion task to complete",
147+
request.getJobId(), task.getId());
148+
listenersByJobId.get(request.getJobId()).add(listener);
149+
return;
150+
} else {
151+
List<ActionListener<AcknowledgedResponse>> listeners = new ArrayList<>();
152+
listeners.add(listener);
153+
listenersByJobId.put(request.getJobId(), listeners);
152154
}
153155
}
154-
if (existingStartedTask.isPresent()) {
155-
logger.debug("[{}] Deletion task [{}] will wait for existing deletion task [{}]", request.getJobId(), task.getId(),
156-
existingStartedTask.get().getId());
157-
TaskId existingTaskId = new TaskId(clusterService.localNode().getId(), existingStartedTask.get().getId());
158-
waitForExistingTaskToComplete(parentTaskClient, request.getJobId(), existingTaskId, listener);
159-
return;
160-
}
161156

162157
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
163158

159+
// The listener that will be executed at the end of the chain will notify all listeners
160+
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
161+
ack -> notifyListeners(request.getJobId(), ack, null),
162+
e -> notifyListeners(request.getJobId(), null, e)
163+
);
164+
164165
ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
165166
response -> {
166167
if (request.isForce()) {
167-
forceDeleteJob(parentTaskClient, request, listener);
168+
forceDeleteJob(parentTaskClient, request, finalListener);
168169
} else {
169-
normalDeleteJob(parentTaskClient, request, listener);
170+
normalDeleteJob(parentTaskClient, request, finalListener);
170171
}
171172
},
172173
e -> {
173174
auditor.error(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING_FAILED, e.getMessage()));
174-
listener.onFailure(e);
175+
finalListener.onFailure(e);
175176
});
176177

177178
markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce());
178179
}
179180

180-
private Optional<Task> findExistingStartedTask(Task currentTask) {
181-
return taskManager.getTasks().values().stream().filter(filteredTask ->
182-
currentTask.getDescription().equals(filteredTask.getDescription()) &&
183-
currentTask.getId() != filteredTask.getId()
184-
&& ((JobDeletionTask) filteredTask).isStarted()).findFirst();
185-
}
186-
187-
private void waitForExistingTaskToComplete(ParentTaskAssigningClient parentTaskClient, String jobId, TaskId taskId,
188-
ActionListener<AcknowledgedResponse> listener) {
189-
threadPool.generic().execute(new AbstractRunnable() {
190-
@Override
191-
public void onFailure(Exception e) {
192-
listener.onFailure(e);
181+
private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) {
182+
synchronized (listenersByJobId) {
183+
List<ActionListener<AcknowledgedResponse>> listeners = listenersByJobId.remove(jobId);
184+
if (listeners == null) {
185+
logger.error("[{}] No deletion job listeners could be found");
186+
return;
193187
}
194-
195-
@Override
196-
protected void doRun() {
197-
GetTaskRequest getTaskRequest = new GetTaskRequest();
198-
getTaskRequest.setTaskId(taskId);
199-
getTaskRequest.setWaitForCompletion(true);
200-
getTaskRequest.setTimeout(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT);
201-
executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, GetTaskAction.INSTANCE, getTaskRequest,
202-
new ActionListener<GetTaskResponse>() {
203-
204-
@Override
205-
public void onResponse(GetTaskResponse getTaskResponse) {
206-
if (getTaskResponse.getTask().getError() != null) {
207-
BytesReference taskErrorBytes = getTaskResponse.getTask().getError();
208-
try (InputStream stream = taskErrorBytes.streamInput();
209-
XContentParser parser = Requests.INDEX_CONTENT_TYPE.xContent().createParser
210-
(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
211-
// Skip the START_OBJECT token
212-
parser.nextToken();
213-
ElasticsearchException taskError = ElasticsearchException.fromXContent(parser);
214-
onFailure(taskError);
215-
} catch (IOException e) {
216-
onFailure(new ElasticsearchException("Could not parse task error: "
217-
+ taskErrorBytes.utf8ToString()));
218-
}
219-
} else {
220-
logger.debug("[{}] Finished waiting for completion of task [{}]", jobId, taskId);
221-
listener.onResponse(new AcknowledgedResponse(true));
222-
}
223-
}
224-
225-
@Override
226-
public void onFailure(Exception e) {
227-
logger.error("[" + jobId + "] Error while waiting for job deletion task to complete", e);
228-
listener.onFailure(e);
229-
}
230-
});
188+
for (ActionListener<AcknowledgedResponse> listener : listeners) {
189+
if (error != null) {
190+
listener.onFailure(error);
191+
} else {
192+
listener.onResponse(ack);
193+
}
231194
}
232-
});
195+
}
233196
}
234197

235198
private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,

0 commit comments

Comments
 (0)