Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.task.JobTask;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

Expand All @@ -41,18 +40,15 @@ public class TransportKillProcessAction extends TransportTasksAction<JobTask,
private static final Logger logger = LogManager.getLogger(TransportKillProcessAction.class);

private final AnomalyDetectionAuditor auditor;
private final AutodetectProcessManager processManager;

@Inject
public TransportKillProcessAction(TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
AutodetectProcessManager processManager,
AnomalyDetectionAuditor auditor) {
super(KillProcessAction.NAME, clusterService, transportService, actionFilters, KillProcessAction.Request::new,
KillProcessAction.Response::new, KillProcessAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.auditor = auditor;
this.processManager = processManager;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,30 +529,29 @@ public void onFailure(Exception e) {
protected void doRun() {
ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId());
if (processContext == null) {
logger.debug("Aborted opening job [{}] as it has been closed", job.getId());
logger.debug("Aborted opening job [{}] as it has been closed or killed", job.getId());
return;
}
// We check again after the process state is locked to ensure no race conditions are hit.
if (processContext.getJobTask().isClosing()) {
logger.debug("Aborted opening job [{}] as it is being closed", job.getId());
logger.debug("Aborted opening job [{}] as it is being closed (before starting process)", job.getId());
jobTask.markAsCompleted();
return;
}

try {
if (createProcessAndSetRunning(processContext, job, params, closeHandler)) {
// This next check also covers the case of a process being killed while it was being started.
// It relies on callers setting the closing flag on the job task before calling this method.
// It also relies on the fact that at this stage of the process lifecycle kill and close are
// basically identical, i.e. the process has done so little work that making it exit by closing
// its input stream will not result in side effects.
if (processContext.getJobTask().isClosing()) {
logger.debug("Aborted opening job [{}] as it is being closed", job.getId());
logger.debug("Aborted opening job [{}] as it is being closed or killed (after starting process)",
job.getId());
closeProcessAndTask(processContext, jobTask, "job is already closing");
return;
}
// It is possible that a `kill` request came in before the communicator was set
// This means that the kill was not handled appropriately and we continued down this execution path
if (processContext.shouldBeKilled()) {
logger.debug("Aborted opening job [{}] as it is being killed", job.getId());
processContext.killIt();
return;
}
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
setJobState(jobTask, JobState.OPENED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ final class ProcessContext {
private final JobTask jobTask;
private volatile AutodetectCommunicator autodetectCommunicator;
private volatile ProcessState state;
private volatile KillBuilder latestKillRequest = null;

ProcessContext(JobTask jobTask) {
this.jobTask = jobTask;
Expand All @@ -47,17 +46,6 @@ private void setAutodetectCommunicator(AutodetectCommunicator autodetectCommunic
this.autodetectCommunicator = autodetectCommunicator;
}

boolean shouldBeKilled() {
return latestKillRequest != null;
}

void killIt() {
if (latestKillRequest == null) {
throw new IllegalArgumentException("Unable to kill job as previous request is not completed");
}
latestKillRequest.kill();
}

ProcessStateName getState() {
return state.getName();
}
Expand Down Expand Up @@ -129,7 +117,12 @@ KillBuilder setShouldFinalizeJob(boolean shouldFinalizeJob) {

void kill() {
if (autodetectCommunicator == null) {
latestKillRequest = this;
// Killing a connected process would also complete the persistent task if `finish` was true,
// so we should do the same here even though the process wasn't yet connected at the time of
// the kill
if (finish) {
jobTask.markAsCompleted();
}
return;
}
String jobId = jobTask.getJobId();
Expand Down