diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 464703f4227ab..121575e50190f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -688,7 +688,6 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet } return new AutodetectCommunicator(job, process, new StateStreamer(client), dataCountsReporter, processor, handler, xContentRegistry, autodetectWorkerExecutor); - } private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) { @@ -735,6 +734,12 @@ private Consumer onProcessCrash(JobTask jobTask) { private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask, String reason) { String jobId = jobTask.getJobId(); long allocationId = jobTask.getAllocationId(); + // We use a lock to prevent simultaneous open and close from conflicting. However, we found + // that we could not use the lock to stop kill from conflicting because that could lead to + // a kill taking an unacceptably long time to have an effect, which largely defeats the point + // of having an option to quickly kill a process. Therefore we have to deal with the effects + // of kill running simultaneously with open and close. + boolean jobKilled = false; processContext.tryLock(); try { if (processContext.setDying() == false) { @@ -744,7 +749,12 @@ private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask, return; } - if (reason == null) { + // If the job was killed early on during its open sequence then + // its context will already have been removed from this map + jobKilled = (processByAllocation.containsKey(allocationId) == false); + if (jobKilled) { + logger.debug("[{}] Cleaning up job opened after kill", jobId); + } else if (reason == null) { logger.info("Closing job [{}]", jobId); } else { logger.info("Closing job [{}], because [{}]", jobId, reason); @@ -752,21 +762,34 @@ private void closeProcessAndTask(ProcessContext processContext, JobTask jobTask, AutodetectCommunicator communicator = processContext.getAutodetectCommunicator(); if (communicator == null) { + assert jobKilled == false + : "Job " + jobId + " killed before process started yet still had no communicator during cleanup after process started"; logger.debug("Job [{}] is being closed before its process is started", jobId); jobTask.markAsCompleted(); + processByAllocation.remove(allocationId); } else { - communicator.close(); + if (jobKilled) { + communicator.killProcess(true, false, false); + } else { + // communicator.close() may take a long time to run, if the job persists a large model state as a + // result of calling it. We want to leave open the option to kill the job during this time, which + // is why the allocation ID must remain in the map until after the close is complete. + communicator.close(); + processByAllocation.remove(allocationId); + } } - - processByAllocation.remove(allocationId); } catch (Exception e) { - // If the close failed because the process has explicitly been killed by us then just pass on that exception + // If the close failed because the process has explicitly been killed by us then just pass on that exception. + // (Note that jobKilled may be false in this case, if the kill is executed while communicator.close() is running.) if (e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException) e).status() == RestStatus.CONFLICT) { - throw e; + logger.trace("[{}] Conflict between kill and close during autodetect process cleanup - job {} before cleanup started", + jobId, jobKilled ? "killed" : "not killed"); + throw (ElasticsearchStatusException) e; } - logger.warn("[" + jobId + "] Exception closing autodetect process", e); + String msg = jobKilled ? "Exception cleaning up autodetect process started after kill" : "Exception closing autodetect process"; + logger.warn("[" + jobId + "] " + msg, e); setJobState(jobTask, JobState.FAILED, e.getMessage()); - throw ExceptionsHelper.serverError("Exception closing autodetect process", e); + throw ExceptionsHelper.serverError(msg, e); } finally { // to ensure the contract that multiple simultaneous close calls for the same job wait until // the job is closed is honoured, hold the lock throughout the close procedure so that another