-
Notifications
You must be signed in to change notification settings - Fork 25.6k
[ML] Fix possible race condition when closing an opening job #42506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -401,16 +401,12 @@ protected void doRun() { | |
| logger.debug("Aborted opening job [{}] as it has been closed", jobId); | ||
| return; | ||
| } | ||
| if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { | ||
| logger.debug("Cannot open job [{}] when its state is [{}]", | ||
| jobId, processContext.getState().getClass().getName()); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| createProcessAndSetRunning(processContext, job, params, closeHandler); | ||
| processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot()); | ||
| setJobState(jobTask, JobState.OPENED); | ||
| if (createProcessAndSetRunning(processContext, job, params, closeHandler)) { | ||
| processContext.getAutodetectCommunicator().restoreState(params.modelSnapshot()); | ||
| setJobState(jobTask, JobState.OPENED); | ||
| } | ||
| } catch (Exception e1) { | ||
| // No need to log here as the persistent task framework will log it | ||
| try { | ||
|
|
@@ -447,19 +443,25 @@ protected void doRun() { | |
| ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler); | ||
| } | ||
|
|
||
| private void createProcessAndSetRunning(ProcessContext processContext, | ||
| Job job, | ||
| AutodetectParams params, | ||
| BiConsumer<Exception, Boolean> handler) throws IOException { | ||
| private boolean createProcessAndSetRunning(ProcessContext processContext, | ||
| Job job, | ||
| AutodetectParams params, | ||
| BiConsumer<Exception, Boolean> handler) throws IOException { | ||
| // At this point we lock the process context until the process has been started. | ||
| // The reason behind this is to ensure closing the job does not happen before | ||
| // the process is started as that can result to the job getting seemingly closed | ||
| // but the actual process is hanging alive. | ||
| processContext.tryLock(); | ||
| try { | ||
| if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { | ||
| logger.debug("Cannot open job [{}] when its state is [{}]", | ||
| job.getId(), processContext.getState().getClass().getName()); | ||
| return false; | ||
| } | ||
| AutodetectCommunicator communicator = create(processContext.getJobTask(), job, params, handler); | ||
| communicator.writeHeader(); | ||
| processContext.setRunning(communicator); | ||
| return true; | ||
| } finally { | ||
| // Now that the process is running and we have updated its state we can unlock. | ||
| // It is important to unlock before we initialize the communicator (ie. load the model state) | ||
|
|
@@ -592,6 +594,8 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) { | |
| try { | ||
| if (processContext.setDying() == false) { | ||
| logger.debug("Cannot close job [{}] as it has been marked as dying", jobId); | ||
| // The only way we can get here is if 2 close requests are made very close together. | ||
| // The other close has done the work so it's safe to return here without doing anything. | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -605,10 +609,10 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) { | |
| if (communicator == null) { | ||
| logger.debug("Job [{}] is being closed before its process is started", jobId); | ||
| jobTask.markAsCompleted(); | ||
| return; | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning here without removing the entry from |
||
| } else { | ||
| communicator.close(restart, reason); | ||
| } | ||
|
|
||
| communicator.close(restart, reason); | ||
| processByAllocation.remove(allocationId); | ||
| } catch (Exception e) { | ||
| // If the close failed because the process has explicitly been killed by us then just pass on that exception | ||
|
|
@@ -628,7 +632,7 @@ public void closeJob(JobTask jobTask, boolean restart, String reason) { | |
| try { | ||
| nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription()); | ||
| } catch (IOException e) { | ||
| logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e); | ||
| logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobId), e); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check needs to be done under lock to prevent a close sneaking in after this check but before the open actions are performed.