From 3b5ad597f42d1cbcd385113349f93ee899299d63 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Sun, 15 Sep 2019 11:56:57 +0300 Subject: [PATCH] [7.4][ML] Avoid marking data frame analytics task completed twice (#46721) When the stop API is called while the task is running there is a chance the task gets marked completed twice. This may cause undesired side effects, like indexing the progress document a second time after the stop API has returned (the cause for #46705). This commit adds a check that the task has not been completed before proceeding to mark it so. In addition, when we update the task's state we could get some warnings that the task was missing if the stop API has been called in the meantime. We now check the errors are `ResourceNotFoundException` and ignore them if so. Closes #46705 Backports #46721 --- .../TransportStartDataFrameAnalyticsAction.java | 7 ++++++- .../ml/dataframe/DataFrameAnalyticsManager.java | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index c0d8d6fbc3260..e30c4e593a7f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -458,7 +458,12 @@ protected void onCancelled() { @Override public void markAsCompleted() { - persistProgress(() -> super.markAsCompleted()); + // It is possible that the stop API has been called in the meantime and that + // may also cause this method to be called. We check whether we have already + // been marked completed to avoid doing it twice. + if (isCompleted() == false) { + persistProgress(() -> super.markAsCompleted()); + } } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 7856edf2a32ef..b5d288cd11e46 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; @@ -79,7 +80,13 @@ public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState current case STARTED: task.updatePersistentTaskState(reindexingState, ActionListener.wrap( updatedTask -> reindexingStateListener.onResponse(config), - reindexingStateListener::onFailure)); + error -> { + if (error instanceof ResourceNotFoundException) { + // The task has been stopped + } else { + reindexingStateListener.onFailure(error); + } + })); break; // The task has fully reindexed the documents and we should continue on with our analyses case ANALYZING: @@ -222,7 +229,13 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi task.markAsCompleted(); } }), - error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()) + error -> { + if (error instanceof ResourceNotFoundException) { + // Task has stopped + } else { + task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage()); + } + } )); }, error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())