diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 03b96c49a8c31..4f450e8a0b245 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -475,7 +475,6 @@ public void testJobRelocationIsMemoryAware() throws Exception { }); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/74101") public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() throws Exception { internalCluster().ensureAtMostNumDataNodes(0); logger.info("Starting dedicated master node..."); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java index 6219548f4e733..2dd7289ffb0d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.persistence; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -14,6 +15,9 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.search.MultiSearchAction; import org.elasticsearch.action.search.MultiSearchRequest; @@ -26,10 +30,10 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; @@ -280,17 +284,7 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp ActionListener deleteByQueryExecutor = ActionListener.wrap( response -> { if (response && indexNames.get().length > 0) { - logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get())); - ConstantScoreQueryBuilder query = - new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); - DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get()) - .setQuery(query) - .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())) - .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) - .setAbortOnVersionConflict(false) - .setRefresh(true); - - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler); + deleteResultsByQuery(jobId, indexNames.get(), dbqHandler); } else { // We did not execute DBQ, no need to delete aliases or check the response dbqHandler.onResponse(null); } @@ -414,6 +408,32 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp deleteModelState(jobId, deleteStateHandler); } + private void deleteResultsByQuery(String jobId, String[] indices, ActionListener listener) { + assert indices.length > 0; + + ActionListener refreshListener = ActionListener.wrap( + refreshResponse -> { + logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices)); + ConstantScoreQueryBuilder query = + new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); + DeleteByQueryRequest request = new DeleteByQueryRequest(indices) + .setQuery(query) + .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())) + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) + .setAbortOnVersionConflict(false) + .setRefresh(true); + + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener); + }, + listener::onFailure + ); + + // First, we refresh the indices to ensure any in-flight docs become visible + RefreshRequest refreshRequest = new RefreshRequest(indices); + refreshRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())); + executeAsyncWithOrigin(client, ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, refreshListener); + } + private void deleteAliases(String jobId, ActionListener finishedHandler) { final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);