Skip to content

Commit 0893e55

Browse files
[7.4][ML] Avoid marking data frame analytics task completed twice (#46721) (#46725)
When the stop API is called while the task is running there is a chance the task gets marked completed twice. This may cause undesired side effects, like indexing the progress document a second time after the stop API has returned (the cause for #46705). This commit adds a check that the task has not been completed before proceeding to mark it so. In addition, when we update the task's state we could get some warnings that the task was missing if the stop API has been called in the meantime. We now check the errors are `ResourceNotFoundException` and ignore them if so. Closes #46705 Backports #46721
1 parent 8cf9ebe commit 0893e55

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,12 @@ protected void onCancelled() {
458458

459459
@Override
460460
public void markAsCompleted() {
461-
persistProgress(() -> super.markAsCompleted());
461+
// It is possible that the stop API has been called in the meantime and that
462+
// may also cause this method to be called. We check whether we have already
463+
// been marked completed to avoid doing it twice.
464+
if (isCompleted() == false) {
465+
persistProgress(() -> super.markAsCompleted());
466+
}
462467
}
463468

464469
@Override

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
1213
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
@@ -79,7 +80,13 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
7980
case STARTED:
8081
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
8182
updatedTask -> reindexingStateListener.onResponse(config),
82-
reindexingStateListener::onFailure));
83+
error -> {
84+
if (error instanceof ResourceNotFoundException) {
85+
// The task has been stopped
86+
} else {
87+
reindexingStateListener.onFailure(error);
88+
}
89+
}));
8390
break;
8491
// The task has fully reindexed the documents and we should continue on with our analyses
8592
case ANALYZING:
@@ -222,7 +229,13 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
222229
task.markAsCompleted();
223230
}
224231
}),
225-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
232+
error -> {
233+
if (error instanceof ResourceNotFoundException) {
234+
// Task has stopped
235+
} else {
236+
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
237+
}
238+
}
226239
));
227240
},
228241
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())

0 commit comments

Comments
 (0)