diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/steps/FinalStep.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/steps/FinalStep.java index 1132b31c111d1..6bffefde369ea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/steps/FinalStep.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/steps/FinalStep.java @@ -10,15 +10,15 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -36,6 +36,7 @@ import java.util.Collections; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * The final step of a data frame analytics job. @@ -80,7 +81,7 @@ private void indexDataCounts(ActionListener listener) { .id(DataCounts.documentId(config.getId())) .setRequireAlias(true) .source(builder); - parentTaskClient().index(indexRequest, listener); + executeAsyncWithOrigin(parentTaskClient(), ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener); } catch (IOException e) { listener.onFailure(ExceptionsHelper.serverError("[{}] Error persisting final data counts", e, config.getId())); } @@ -97,10 +98,7 @@ private void refreshIndices(ActionListener listener) { LOGGER.debug(() -> new ParameterizedMessage("[{}] Refreshing indices {}", config.getId(), Arrays.toString(refreshRequest.indices()))); - ParentTaskAssigningClient parentTaskClient = parentTaskClient(); - try (ThreadContext.StoredContext ignore = parentTaskClient.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { - parentTaskClient.admin().indices().refresh(refreshRequest, listener); - } + executeAsyncWithOrigin(parentTaskClient(), ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, listener); } @Override