Skip to content

Commit 5eb38ec

Browse files
author
David Roberts
authored
[ML] Fix possible race condition when closing an opening job (#42506)
This change fixes a race condition that would result in an in-memory data structure becoming out-of-sync with persistent tasks in cluster state. If repeated often enough this could result in it being impossible to open any ML jobs on the affected node, as the master node would think the node had capacity to open another job but the chosen node would error during the open sequence due to its in-memory data structure being full. The race could be triggered by opening a job and then closing it a tiny fraction of a second later. It is unlikely a user of the UI could open and close the job that fast, but a script or program calling the REST API could. The nasty thing is, from the externally observable states and stats everything would appear to be fine - the fast open then close sequence would appear to leave the job in the closed state. It's only later that the leftovers in the in-memory data structure might build up and cause a problem.
1 parent 8f03033 commit 5eb38ec

File tree

1 file changed

+19
-15
lines changed

1 file changed

+19
-15
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -401,16 +401,12 @@ protected void doRun() {
401401
logger.debug("Aborted opening job [{}] as it has been closed", jobId);
402402
return;
403403
}
404-
if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
405-
logger.debug("Cannot open job [{}] when its state is [{}]",
406-
jobId, processContext.getState().getClass().getName());
407-
return;
408-
}
409404

410405
try {
411-
createProcessAndSetRunning(processContext, job, params, closeHandler);
412-
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
413-
setJobState(jobTask, JobState.OPENED);
406+
if (createProcessAndSetRunning(processContext, job, params, closeHandler)) {
407+
processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot());
408+
setJobState(jobTask, JobState.OPENED);
409+
}
414410
} catch (Exception e1) {
415411
// No need to log here as the persistent task framework will log it
416412
try {
@@ -447,19 +443,25 @@ protected void doRun() {
447443
ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler);
448444
}
449445

450-
private void createProcessAndSetRunning(ProcessContext processContext,
451-
Job job,
452-
AutodetectParams params,
453-
BiConsumer<Exception, Boolean> handler) throws IOException {
446+
private boolean createProcessAndSetRunning(ProcessContext processContext,
447+
Job job,
448+
AutodetectParams params,
449+
BiConsumer<Exception, Boolean> handler) throws IOException {
454450
// At this point we lock the process context until the process has been started.
455451
// The reason behind this is to ensure closing the job does not happen before
456452
// the process is started as that can result to the job getting seemingly closed
457453
// but the actual process is hanging alive.
458454
processContext.tryLock();
459455
try {
456+
if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
457+
logger.debug("Cannot open job [{}] when its state is [{}]",
458+
job.getId(), processContext.getState().getClass().getName());
459+
return false;
460+
}
460461
AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler);
461462
communicator.writeHeader();
462463
processContext.setRunning(communicator);
464+
return true;
463465
} finally {
464466
// Now that the process is running and we have updated its state we can unlock.
465467
// It is important to unlock before we initialize the communicator (ie. load the model state)
@@ -592,6 +594,8 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
592594
try {
593595
if (processContext.setDying() == false) {
594596
logger.debug("Cannot close job [{}] as it has been marked as dying", jobId);
597+
// The only way we can get here is if 2 close requests are made very close together.
598+
// The other close has done the work so it's safe to return here without doing anything.
595599
return;
596600
}
597601

@@ -605,10 +609,10 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
605609
if (communicator == null) {
606610
logger.debug("Job [{}] is being closed before its process is started", jobId);
607611
jobTask.markAsCompleted();
608-
return;
612+
} else {
613+
communicator.close(restart, reason);
609614
}
610615

611-
communicator.close(restart, reason);
612616
processByAllocation.remove(allocationId);
613617
} catch (Exception e) {
614618
// If the close failed because the process has explicitly been killed by us then just pass on that exception
@@ -628,7 +632,7 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) {
628632
try {
629633
nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription());
630634
} catch (IOException e) {
631-
logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e);
635+
logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobId), e);
632636
}
633637
}
634638

0 commit comments

Comments
 (0)