|
7 | 7 | package org.elasticsearch.xpack.ml.job.persistence; |
8 | 8 |
|
9 | 9 | import com.carrotsearch.hppc.cursors.ObjectObjectCursor; |
| 10 | + |
10 | 11 | import org.apache.logging.log4j.LogManager; |
11 | 12 | import org.apache.logging.log4j.Logger; |
12 | 13 | import org.elasticsearch.action.ActionListener; |
13 | 14 | import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; |
14 | 15 | import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; |
15 | 16 | import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; |
16 | 17 | import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; |
| 18 | +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; |
| 19 | +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; |
| 20 | +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; |
17 | 21 | import org.elasticsearch.action.bulk.BulkItemResponse; |
18 | 22 | import org.elasticsearch.action.search.MultiSearchAction; |
19 | 23 | import org.elasticsearch.action.search.MultiSearchRequest; |
|
26 | 30 | import org.elasticsearch.cluster.ClusterState; |
27 | 31 | import org.elasticsearch.cluster.metadata.AliasMetadata; |
28 | 32 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; |
| 33 | +import org.elasticsearch.common.util.concurrent.ThreadContext; |
29 | 34 | import org.elasticsearch.core.CheckedConsumer; |
30 | 35 | import org.elasticsearch.core.Nullable; |
31 | 36 | import org.elasticsearch.core.TimeValue; |
32 | | -import org.elasticsearch.common.util.concurrent.ThreadContext; |
33 | 37 | import org.elasticsearch.index.IndexNotFoundException; |
34 | 38 | import org.elasticsearch.index.query.BoolQueryBuilder; |
35 | 39 | import org.elasticsearch.index.query.ConstantScoreQueryBuilder; |
@@ -280,17 +284,7 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp |
280 | 284 | ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap( |
281 | 285 | response -> { |
282 | 286 | if (response && indexNames.get().length > 0) { |
283 | | - logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get())); |
284 | | - ConstantScoreQueryBuilder query = |
285 | | - new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); |
286 | | - DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get()) |
287 | | - .setQuery(query) |
288 | | - .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())) |
289 | | - .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) |
290 | | - .setAbortOnVersionConflict(false) |
291 | | - .setRefresh(true); |
292 | | - |
293 | | - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler); |
| 287 | + deleteResultsByQuery(jobId, indexNames.get(), dbqHandler); |
294 | 288 | } else { // We did not execute DBQ, no need to delete aliases or check the response |
295 | 289 | dbqHandler.onResponse(null); |
296 | 290 | } |
@@ -414,6 +408,32 @@ public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExp |
414 | 408 | deleteModelState(jobId, deleteStateHandler); |
415 | 409 | } |
416 | 410 |
|
| 411 | + private void deleteResultsByQuery(String jobId, String[] indices, ActionListener<BulkByScrollResponse> listener) { |
| 412 | + assert indices.length > 0; |
| 413 | + |
| 414 | + ActionListener<RefreshResponse> refreshListener = ActionListener.wrap( |
| 415 | + refreshResponse -> { |
| 416 | + logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices)); |
| 417 | + ConstantScoreQueryBuilder query = |
| 418 | + new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); |
| 419 | + DeleteByQueryRequest request = new DeleteByQueryRequest(indices) |
| 420 | + .setQuery(query) |
| 421 | + .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())) |
| 422 | + .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES) |
| 423 | + .setAbortOnVersionConflict(false) |
| 424 | + .setRefresh(true); |
| 425 | + |
| 426 | + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener); |
| 427 | + }, |
| 428 | + listener::onFailure |
| 429 | + ); |
| 430 | + |
| 431 | + // First, we refresh the indices to ensure any in-flight docs become visible |
| 432 | + RefreshRequest refreshRequest = new RefreshRequest(indices); |
| 433 | + refreshRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden())); |
| 434 | + executeAsyncWithOrigin(client, ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, refreshListener); |
| 435 | + } |
| 436 | + |
417 | 437 | private void deleteAliases(String jobId, ActionListener<AcknowledgedResponse> finishedHandler) { |
418 | 438 | final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); |
419 | 439 | final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); |
|
0 commit comments