Skip to content

Commit 3f18cf7

Browse files
[ML] Avoid marking data frame analytics task completed twice (#46721)
When the stop API is called while the task is running there is a chance the task gets marked completed twice. This may cause undesired side effects, like indexing the progress document a second time after the stop API has returned (the cause for #46705). This commit adds a check that the task has not been completed before proceeding to mark it so. In addition, when we update the task's state we could get some warnings that the task was missing if the stop API has been called in the meantime. We now check the errors are `ResourceNotFoundException` and ignore them if so. Closes #46705
1 parent 3074e51 commit 3f18cf7

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
1213
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
@@ -78,7 +79,13 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current
7879
case STARTED:
7980
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
8081
updatedTask -> reindexingStateListener.onResponse(config),
81-
reindexingStateListener::onFailure));
82+
error -> {
83+
if (error instanceof ResourceNotFoundException) {
84+
// The task has been stopped
85+
} else {
86+
reindexingStateListener.onFailure(error);
87+
}
88+
}));
8289
break;
8390
// The task has fully reindexed the documents and we should continue on with our analyses
8491
case ANALYZING:
@@ -221,7 +228,13 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
221228
task.markAsCompleted();
222229
}
223230
}),
224-
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
231+
error -> {
232+
if (error instanceof ResourceNotFoundException) {
233+
// Task has stopped
234+
} else {
235+
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
236+
}
237+
}
225238
));
226239
},
227240
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,12 @@ protected void onCancelled() {
9696

9797
@Override
9898
public void markAsCompleted() {
99-
persistProgress(() -> super.markAsCompleted());
99+
// It is possible that the stop API has been called in the meantime and that
100+
// may also cause this method to be called. We check whether we have already
101+
// been marked completed to avoid doing it twice.
102+
if (isCompleted() == false) {
103+
persistProgress(() -> super.markAsCompleted());
104+
}
100105
}
101106

102107
@Override

0 commit comments

Comments
 (0)