diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index 1142b5411fb0c..d8dcd4ee43a66 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.ml.dataframe; import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -310,6 +311,15 @@ public static String documentId(String id) { return TYPE + "-" + id; } + /** + * Returns the job id from the doc id. Returns {@code null} if the doc id is invalid. + */ + @Nullable + public static String extractJobIdFromDocId(String docId) { + String jobId = docId.replaceAll("^" + TYPE +"-", ""); + return jobId.equals(docId) ? null : jobId; + } + public static class Builder { private String id; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java index 191045e5786fa..1fa16f5059d31 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java @@ -39,6 +39,8 @@ public class Classification implements DataFrameAnalysis { public static final ParseField TRAINING_PERCENT = new ParseField("training_percent"); public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed"); + private static final String STATE_DOC_ID_SUFFIX = "_classification_state#1"; + private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private static final ConstructingObjectParser STRICT_PARSER = createParser(false); @@ -258,7 +260,12 @@ public boolean persistsState() { @Override public String getStateDocId(String jobId) { - return jobId + "_classification_state#1"; + return jobId + STATE_DOC_ID_SUFFIX; + } + + public static String extractJobIdFromStateDoc(String stateDocId) { + int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX); + return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java index 8fffcd0f573da..27c8a3f2eb7ca 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java @@ -36,6 +36,8 @@ public class Regression implements DataFrameAnalysis { public static final ParseField TRAINING_PERCENT = new ParseField("training_percent"); public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed"); + private static final String STATE_DOC_ID_SUFFIX = "_regression_state#1"; + private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private static final ConstructingObjectParser STRICT_PARSER = createParser(false); @@ -196,7 +198,12 @@ public boolean persistsState() { @Override public String getStateDocId(String jobId) { - return jobId + "_regression_state#1"; + return jobId + STATE_DOC_ID_SUFFIX; + } + + public static String extractJobIdFromStateDoc(String stateDocId) { + int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX); + return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java index 33cb457e4e731..784b45d863ebb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java @@ -53,6 +53,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase { @@ -384,6 +385,13 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsBeforeItWas } } + public void testExtractJobIdFromDocId() { + assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-foo"), equalTo("foo")); + assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-data_frame_analytics_config-foo"), + equalTo("data_frame_analytics_config-foo")); + assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue())); + } + private static void assertTooSmall(ElasticsearchStatusException e) { assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb.")); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java index e5991b976199e..8b23fe619efc8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java @@ -215,4 +215,9 @@ public void testGetStateDocId() { String randomId = randomAlphaOfLength(10); assertThat(classification.getStateDocId(randomId), equalTo(randomId + "_classification_state#1")); } + + public void testExtractJobIdFromStateDoc() { + assertThat(Classification.extractJobIdFromStateDoc("foo_bar-1_classification_state#1"), equalTo("foo_bar-1")); + assertThat(Classification.extractJobIdFromStateDoc("noop"), is(nullValue())); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java index cabb081146370..d45125bbc3d7e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java @@ -111,6 +111,11 @@ public void testGetStateDocId() { assertThat(regression.getStateDocId(randomId), equalTo(randomId + "_regression_state#1")); } + public void testExtractJobIdFromStateDoc() { + assertThat(Regression.extractJobIdFromStateDoc("foo_bar-1_regression_state#1"), equalTo("foo_bar-1")); + assertThat(Regression.extractJobIdFromStateDoc("noop"), is(nullValue())); + } + public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException { Regression regression = createRandom(); assertThat(regression.getRandomizeSeed(), is(notNullValue())); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java index dcc465d98bb97..33537af3f4477 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; @@ -17,6 +18,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; @@ -314,6 +316,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds)); } + public void testDeleteExpiredData_RemovesUnusedState() throws Exception { + initialize("classification_delete_expired_data"); + indexData(sourceIndex, 100, 0, KEYWORD_FIELD); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD)); + registerAnalytics(config); + putAnalytics(config); + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + assertProgress(jobId, 100, 100, 100, 100); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertInferenceModelPersisted(jobId); + + // Call _delete_expired_data API and check nothing was deleted + assertThat(deleteExpiredData().isDeleted(), is(true)); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + + // Delete the config straight from the config index + DeleteResponse deleteResponse = client().prepareDelete().setIndex(".ml-config").setId(DataFrameAnalyticsConfig.documentId(jobId)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet(); + assertThat(deleteResponse.status(), equalTo(RestStatus.OK)); + + // Now calling the _delete_expired_data API should remove unused state + assertThat(deleteExpiredData().isDeleted(), is(true)); + + SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet(); + assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + } + private void initialize(String jobId) { this.jobId = jobId; this.sourceIndex = jobId + "_source_index"; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 66fae8e458789..2e75bd2d40a59 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -87,7 +87,7 @@ public void tearDownData() { cleanUp(); } - public void testDeleteExpiredDataGivenNothingToDelete() throws Exception { + public void testDeleteExpiredData_GivenNothingToDelete() throws Exception { // Tests that nothing goes wrong when there's nothing to delete client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); } @@ -202,10 +202,7 @@ public void testDeleteExpiredData() throws Exception { assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK)); // Now call the action under test - client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); - - // We need to refresh to ensure the deletion is visible - client().admin().indices().prepareRefresh("*").get(); + assertThat(deleteExpiredData().isDeleted(), is(true)); // no-retention job should have kept all data assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70))); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index fad76f8b23f80..2d41cf6409700 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -21,6 +21,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; @@ -45,7 +46,6 @@ import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; -import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java index 82af47ce893f6..d99f58e608e12 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java @@ -40,7 +40,7 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; -import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; +import org.elasticsearch.xpack.ml.dataframe.StoredProgress; import org.hamcrest.Matcher; import org.hamcrest.Matchers; @@ -205,7 +205,7 @@ protected void assertProgress(String id, int reindexing, int loadingData, int an } protected SearchResponse searchStoredProgress(String jobId) { - String docId = DataFrameAnalyticsTask.progressDocId(jobId); + String docId = StoredProgress.documentId(jobId); return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) .setQuery(QueryBuilders.idsQuery().addIds(docId)) .get(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index b3d8878af18c3..4916fef97898d 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -113,6 +114,16 @@ private void waitForPendingTasks() { } } + protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception { + DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE, + new DeleteExpiredDataAction.Request()).get(); + + // We need to refresh to ensure the deletion is visible + client().admin().indices().prepareRefresh("*").get(); + + return response; + } + @Override protected void ensureClusterStateConsistency() throws IOException { if (cluster() != null && cluster().size() > 0) { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index bd22f9ee9ced1..5ecab6f69d429 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -7,11 +7,13 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; @@ -272,6 +274,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds)); } + public void testDeleteExpiredData_RemovesUnusedState() throws Exception { + initialize("regression_delete_expired_data"); + indexData(sourceIndex, 100, 0); + + DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD)); + registerAnalytics(config); + putAnalytics(config); + startAnalytics(jobId); + waitUntilAnalyticsIsStopped(jobId); + + assertProgress(jobId, 100, 100, 100, 100); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + assertInferenceModelPersisted(jobId); + + // Call _delete_expired_data API and check nothing was deleted + assertThat(deleteExpiredData().isDeleted(), is(true)); + assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L)); + assertModelStatePersisted(stateDocId()); + + // Delete the config straight from the config index + DeleteResponse deleteResponse = client().prepareDelete().setIndex(".ml-config").setId(DataFrameAnalyticsConfig.documentId(jobId)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet(); + assertThat(deleteResponse.status(), equalTo(RestStatus.OK)); + + // Now calling the _delete_expired_data API should remove unused state + assertThat(deleteExpiredData().isDeleted(), is(true)); + + SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet(); + assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + } + private void initialize(String jobId) { this.jobId = jobId; this.sourceIndex = jobId + "_source_index"; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java index 3edefe5053735..fc8bdeca5f55a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java @@ -43,7 +43,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; +import org.elasticsearch.xpack.ml.dataframe.StoredProgress; import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; @@ -171,7 +171,7 @@ private void deleteState(ParentTaskAssigningClient parentTaskClient, DataFrameAnalyticsConfig config, ActionListener listener) { List ids = new ArrayList<>(); - ids.add(DataFrameAnalyticsTask.progressDocId(config.getId())); + ids.add(StoredProgress.documentId(config.getId())); if (config.getAnalysis().persistsState()) { ids.add(config.getAnalysis().getStateDocId(config.getId())); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index aadd6041ae642..4b35b092285e2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -187,7 +187,7 @@ private void searchStoredProgresses(List configIds, ActionListener { GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0); IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()); - indexRequest.id(progressDocId(taskParams.getId())); + indexRequest.id(StoredProgress.documentId(taskParams.getId())); indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) { new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS); @@ -310,10 +310,6 @@ public static StartingState determineStartingState(String jobId, List extends BatchedDocumentsIterator> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java index 92235570b47b5..65e8b75671151 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java @@ -9,6 +9,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator; /** * Iterates through the state doc ids diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java index 47ab2364db67e..23834f6949355 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java @@ -12,7 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.Result; -import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator; +import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator; import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index c603502afd5d6..8a1d30382489f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -16,14 +16,19 @@ import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification; +import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.dataframe.StoredProgress; import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; +import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator; import java.util.ArrayList; import java.util.Arrays; @@ -93,6 +98,13 @@ private List findUnusedStateDocIds() { private Set getJobIds() { Set jobIds = new HashSet<>(); + jobIds.addAll(getAnamalyDetectionJobIds()); + jobIds.addAll(getDataFrameAnalyticsJobIds()); + return jobIds; + } + + private Set getAnamalyDetectionJobIds() { + Set jobIds = new HashSet<>(); // TODO Once at 8.0, we can stop searching for jobs in cluster state // and remove cluster service as a member all together. @@ -106,6 +118,18 @@ private Set getJobIds() { return jobIds; } + private Set getDataFrameAnalyticsJobIds() { + Set jobIds = new HashSet<>(); + + DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, AnomalyDetectorsIndex.configIndexName(), + QueryBuilders.termQuery(DataFrameAnalyticsConfig.CONFIG_TYPE.getPreferredName(), DataFrameAnalyticsConfig.TYPE)); + while (iterator.hasNext()) { + Deque docIds = iterator.next(); + docIds.stream().map(DataFrameAnalyticsConfig::extractJobIdFromDocId).filter(Objects::nonNull).forEach(jobIds::add); + } + return jobIds; + } + private void executeDeleteUnusedStateDocs(List unusedDocIds, ActionListener listener) { LOGGER.info("Found [{}] unused state documents; attempting to delete", unusedDocIds.size()); @@ -137,7 +161,13 @@ private void executeDeleteUnusedStateDocs(List unusedDocIds, ActionListe private static class JobIdExtractor { private static List> extractors = Arrays.asList( - ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId); + ModelState::extractJobId, + Quantiles::extractJobId, + CategorizerState::extractJobId, + Classification::extractJobIdFromStateDoc, + Regression::extractJobIdFromStateDoc, + StoredProgress::extractJobIdFromDocId + ); private static String extractJobId(String docId) { String jobId; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIterator.java similarity index 99% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIterator.java index 4aebbd0743d28..119dcbdb42822 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIterator.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.persistence; +package org.elasticsearch.xpack.ml.utils.persistence; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/DocIdBatchedDocumentIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/DocIdBatchedDocumentIterator.java new file mode 100644 index 0000000000000..55b2cee2ff16d --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/DocIdBatchedDocumentIterator.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.utils.persistence; + +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; + +import java.util.Objects; + +/** + * This is a document iterator that returns just the id of each matched document. + */ +public class DocIdBatchedDocumentIterator extends BatchedDocumentsIterator { + + private final QueryBuilder query; + + public DocIdBatchedDocumentIterator(Client client, String index, QueryBuilder query) { + super(client, index); + this.query = Objects.requireNonNull(query); + } + + @Override + protected QueryBuilder getQuery() { + return query; + } + + @Override + protected String map(SearchHit hit) { + return hit.getId(); + } + + @Override + protected boolean shouldFetchSource() { + return false; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/StoredProgressTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/StoredProgressTests.java index 572ca816f81e6..0524fbd37c336 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/StoredProgressTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/StoredProgressTests.java @@ -13,6 +13,8 @@ import java.util.ArrayList; import java.util.List; +import static org.hamcrest.Matchers.equalTo; + public class StoredProgressTests extends AbstractXContentTestCase { @Override @@ -34,4 +36,14 @@ protected StoredProgress createTestInstance() { } return new StoredProgress(progress); } + + public void testDocumentId() { + assertThat(StoredProgress.documentId("foo"), equalTo("data_frame_analytics-foo-progress")); + } + + public void testExtractJobIdFromDocId() { + assertThat(StoredProgress.extractJobIdFromDocId("data_frame_analytics-foo-progress"), equalTo("foo")); + assertThat(StoredProgress.extractJobIdFromDocId("data_frame_analytics-data_frame_analytics-bar-progress-progress"), + equalTo("data_frame_analytics-bar-progress")); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIteratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java similarity index 99% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIteratorTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java index 5e30ac1b29d1d..ee862695beb27 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIteratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.persistence; +package org.elasticsearch.xpack.ml.utils.persistence; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionFuture;