From a2e8490373f74cfdfbdaf52baacb5ef35ba31cf6 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 16 May 2018 12:16:13 +0300 Subject: [PATCH 1/3] [ML] Clean left behind model state docs It is possible for model state documents to be left behind in the state index. This may be because of bugs or uncontrollable scenarios. In any case, those documents may take up quite some disk space when they add up. This commit adds a step in the expired data deletion that is part of the daily maintenance service. The new step searches for state documents that do not belong to any of the current jobs and deletes them. Closes #30551 --- .../TransportDeleteExpiredDataAction.java | 4 +- .../persistence/BatchedDocumentsIterator.java | 9 ++ .../BatchedStateDocIdsIterator.java | 36 ++++++ .../ml/job/retention/UnusedStateRemover.java | 107 ++++++++++++++++++ .../ml/integration/DeleteExpiredDataIT.java | 36 +++++- 5 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 0e1ca9dd9aec3..9ab2132b61912 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover; import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover; import org.elasticsearch.xpack.ml.job.retention.MlDataRemover; +import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator; @@ -56,7 +57,8 @@ private void deleteExpiredData(ActionListener List dataRemovers = Arrays.asList( new ExpiredResultsRemover(client, clusterService, auditor), new ExpiredForecastsRemover(client), - new ExpiredModelSnapshotsRemover(client, clusterService) + new ExpiredModelSnapshotsRemover(client, clusterService), + new UnusedStateRemover(client, clusterService) ); Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); deleteExpiredData(dataRemoversIterator, listener); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java index cf50579a0e517..d50a7c3f8c2ad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java @@ -97,6 +97,7 @@ private SearchResponse initScroll() { searchRequest.source(new SearchSourceBuilder() .size(BATCH_SIZE) .query(getQuery()) + .fetchSource(shouldFetchSource()) .sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC))); SearchResponse searchResponse = client.search(searchRequest).actionGet(); @@ -123,6 +124,14 @@ private Deque mapHits(SearchResponse searchResponse) { return results; } + /** + * Should fetch source? Defaults to {@code true} + * @return whether the source should be fetched + */ + protected boolean shouldFetchSource() { + return true; + } + /** * Get the query to use for the search * @return the search query diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java new file mode 100644 index 0000000000000..92235570b47b5 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.persistence; + +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; + +/** + * Iterates through the state doc ids + */ +public class BatchedStateDocIdsIterator extends BatchedDocumentsIterator { + + public BatchedStateDocIdsIterator(Client client, String index) { + super(client, index); + } + + @Override + protected boolean shouldFetchSource() { + return false; + } + + @Override + protected QueryBuilder getQuery() { + return QueryBuilders.matchAllQuery(); + } + + @Override + protected String map(SearchHit hit) { + return hit.getId(); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java new file mode 100644 index 0000000000000..2e0871a7a57d9 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.retention; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.core.ml.MLMetadataField; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; + +import java.util.Collections; +import java.util.Deque; +import java.util.Objects; +import java.util.Set; + +public class UnusedStateRemover implements MlDataRemover { + + private static final Logger LOGGER = Loggers.getLogger(UnusedStateRemover.class); + + private final Client client; + private final ClusterService clusterService; + + public UnusedStateRemover(Client client, ClusterService clusterService) { + this.client = Objects.requireNonNull(client); + this.clusterService = Objects.requireNonNull(clusterService); + } + + @Override + public void remove(ActionListener listener) { + try { + BulkRequestBuilder deleteUnusedStateRequestBuilder = findUnusedStateDocs(); + if (deleteUnusedStateRequestBuilder.numberOfActions() > 0) { + executeDeleteUnusedStateDocs(deleteUnusedStateRequestBuilder, listener); + } else { + listener.onResponse(true); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + + private BulkRequestBuilder findUnusedStateDocs() { + Set jobIds = getJobIds(); + BulkRequestBuilder deleteUnusedStateRequestBuilder = client.prepareBulk(); + BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, AnomalyDetectorsIndex.jobStateIndexName()); + while (stateDocIdsIterator.hasNext()) { + Deque stateDocIds = stateDocIdsIterator.next(); + for (String stateDocId : stateDocIds) { + int modelStateSuffixIndex = stateDocId.indexOf("_model_state_"); + if (modelStateSuffixIndex < 0) { + // e.g. quantiles, etc. + continue; + } + String jobId = stateDocId.substring(0, modelStateSuffixIndex); + if (jobIds.contains(jobId) == false) { + deleteUnusedStateRequestBuilder.add(new DeleteRequest( + AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId)); + } + } + } + return deleteUnusedStateRequestBuilder; + } + + private Set getJobIds() { + ClusterState clusterState = clusterService.state(); + MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE); + if (mlMetadata != null) { + return mlMetadata.getJobs().keySet(); + } + return Collections.emptySet(); + } + + private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener listener) { + LOGGER.info("Found {} unused model state documents; attempting to delete", + deleteUnusedStateRequestBuilder.numberOfActions()); + deleteUnusedStateRequestBuilder.execute(new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + LOGGER.error("Some unused model state documents could not be deleted due to failures: {}", + bulkItemResponses.buildFailureMessage()); + } else { + LOGGER.info("Successfully deleted unused model state documents"); + } + listener.onResponse(true); + } + + @Override + public void onFailure(Exception e) { + LOGGER.error("Error deleting unused model state documents: ", e); + listener.onFailure(e); + } + }); + } +} diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 3a1fc2b0f6d4a..62dbd1e33d8f4 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -8,12 +8,15 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -21,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; @@ -31,13 +35,16 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { @@ -78,11 +85,16 @@ public void setUpData() throws IOException { } @After - public void tearDownData() throws Exception { + public void tearDownData() { client().admin().indices().prepareDelete(DATA_INDEX).get(); cleanUp(); } + public void testDeleteExpiredDataGivenNothingToDelete() throws Exception { + // Tests that nothing goes wrong when there's nothing to delete + client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); + } + public void testDeleteExpiredData() throws Exception { registerJob(newJobBuilder("no-retention").setResultsRetentionDays(null).setModelSnapshotRetentionDays(null)); registerJob(newJobBuilder("results-retention").setResultsRetentionDays(1L).setModelSnapshotRetentionDays(null)); @@ -166,6 +178,18 @@ public void testDeleteExpiredData() throws Exception { assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount())); } + // Index some unused state documents (more than 10K to test scrolling works) + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < 10010; i++) { + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", + "non_existing_job_model_state_" + i); + indexRequest.source(Collections.emptyMap()); + bulkRequestBuilder.add(indexRequest); + } + assertThat(bulkRequestBuilder.get().status(), equalTo(RestStatus.OK)); + + // Now call the action under test client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); // We need to refresh to ensure the deletion is visible @@ -216,6 +240,16 @@ public void testDeleteExpiredData() throws Exception { assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L)); } } + + // Verify .ml-state doesn't contain unused state documents + SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName()) + .setFetchSource(false) + .setSize(10000) + .get(); + assertThat(stateDocsResponse.getHits().getTotalHits(), lessThan(10000L)); + for (SearchHit hit : stateDocsResponse.getHits().getHits()) { + assertThat(hit.getId().startsWith("non_existing_job"), is(false)); + } } private static Job.Builder newJobBuilder(String id) { From 78e61c7005ad203cd09eba8a5a0808181e19e472 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 17 May 2018 00:45:08 +0300 Subject: [PATCH 2/3] Also remove all other state docs and other review comments --- .../autodetect/state/CategorizerState.java | 10 +++++ .../process/autodetect/state/ModelState.java | 10 +++++ .../process/autodetect/state/Quantiles.java | 10 +++++ .../state/CategorizerStateTests.java | 28 +++++++++++++ .../autodetect/state/ModelStateTests.java | 30 ++++++++++++++ .../autodetect/state/QuantilesTests.java | 16 ++++++++ .../ml/job/retention/UnusedStateRemover.java | 41 +++++++++++++++---- .../ml/integration/DeleteExpiredDataIT.java | 4 +- 8 files changed, 140 insertions(+), 9 deletions(-) create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java index 8c08300354698..c1ee6daca063f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java @@ -37,6 +37,16 @@ public static final String v54DocumentPrefix(String jobId) { return jobId + "#"; } + /** + * Given the id of a categorizer state document it extracts the job id + * @param docId the categorizer state document id + * @return the job id or {@code null} if the id is not valid + */ + public static final String extractJobId(String docId) { + int suffixIndex = docId.indexOf("_" + TYPE); + return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex); + } + private CategorizerState() { } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java index dce791a2b3d26..27d9fbdc6fc49 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java @@ -29,6 +29,16 @@ public static final String v54DocumentId(String jobId, String snapshotId, int do return jobId + "-" + snapshotId + "#" + docNum; } + /** + * Given the id of a state document it extracts the job id + * @param docId the state document id + * @return the job id or {@code null} if the id is not valid + */ + public static final String extractJobId(String docId) { + int suffixIndex = docId.indexOf("_" + TYPE + "_"); + return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex); + } + private ModelState() { } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java index 0c167aadb7623..8a7c4d9d2c5b0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java @@ -60,6 +60,16 @@ public static String v54DocumentId(String jobId) { return jobId + "-" + TYPE; } + /** + * Given the id of a quantiles document it extracts the job id + * @param docId the quantiles document id + * @return the job id or {@code null} if the id is not valid + */ + public static final String extractJobId(String docId) { + int suffixIndex = docId.indexOf("_" + TYPE); + return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex); + } + private final String jobId; private final Date timestamp; private final String quantileState; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java new file mode 100644 index 0000000000000..df927bd699a0d --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.job.process.autodetect.state; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; + +public class CategorizerStateTests extends ESTestCase { + + public void testExtractJobId_GivenValidDocId() { + assertThat(CategorizerState.extractJobId("foo_categorizer_state#1"), equalTo("foo")); + assertThat(CategorizerState.extractJobId("bar_categorizer_state#2"), equalTo("bar")); + assertThat(CategorizerState.extractJobId("foo_bar_categorizer_state#3"), equalTo("foo_bar")); + } + + public void testExtractJobId_GivenInvalidDocId() { + assertThat(CategorizerState.extractJobId(""), is(nullValue())); + assertThat(CategorizerState.extractJobId("foo"), is(nullValue())); + assertThat(CategorizerState.extractJobId("_categorizer_state"), is(nullValue())); + assertThat(CategorizerState.extractJobId("foo_model_state_3141341341"), is(nullValue())); + } +} \ No newline at end of file diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java new file mode 100644 index 0000000000000..70b1615cf0706 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.job.process.autodetect.state; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; + +public class ModelStateTests extends ESTestCase { + + public void testExtractJobId_GivenValidDocId() { + assertThat(ModelState.extractJobId("foo_model_state_3151373783#1"), equalTo("foo")); + assertThat(ModelState.extractJobId("bar_model_state_451515#3"), equalTo("bar")); + assertThat(ModelState.extractJobId("foo_bar_model_state_blah_blah"), equalTo("foo_bar")); + } + + public void testExtractJobId_GivenInvalidDocId() { + assertThat(ModelState.extractJobId(""), is(nullValue())); + assertThat(ModelState.extractJobId("foo"), is(nullValue())); + assertThat(ModelState.extractJobId("_model_3141341341"), is(nullValue())); + assertThat(ModelState.extractJobId("_state_3141341341"), is(nullValue())); + assertThat(ModelState.extractJobId("_model_state_3141341341"), is(nullValue())); + assertThat(ModelState.extractJobId("foo_quantiles"), is(nullValue())); + } +} \ No newline at end of file diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java index 84c1a161f1ee4..b97524d7fd8e8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java @@ -15,9 +15,25 @@ import java.util.Date; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; public class QuantilesTests extends AbstractSerializingTestCase { + public void testExtractJobId_GivenValidDocId() { + assertThat(Quantiles.extractJobId("foo_quantiles"), equalTo("foo")); + assertThat(Quantiles.extractJobId("bar_quantiles"), equalTo("bar")); + assertThat(Quantiles.extractJobId("foo_bar_quantiles"), equalTo("foo_bar")); + } + + public void testExtractJobId_GivenInvalidDocId() { + assertThat(Quantiles.extractJobId(""), is(nullValue())); + assertThat(Quantiles.extractJobId("foo"), is(nullValue())); + assertThat(Quantiles.extractJobId("_quantiles"), is(nullValue())); + assertThat(Quantiles.extractJobId("foo_model_state_3141341341"), is(nullValue())); + } + public void testEquals_GivenSameObject() { Quantiles quantiles = new Quantiles("foo", new Date(0L), "foo"); assertTrue(quantiles.equals(quantiles)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 2e0871a7a57d9..b07b025e09e56 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -18,13 +18,24 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; +import java.util.Arrays; import java.util.Collections; import java.util.Deque; +import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.function.Function; +/** + * If for any reason a job is deleted by some of its state documents + * are left behind, this class deletes any unused documents stored + * in the .ml-state index. + */ public class UnusedStateRemover implements MlDataRemover { private static final Logger LOGGER = Loggers.getLogger(UnusedStateRemover.class); @@ -58,12 +69,11 @@ private BulkRequestBuilder findUnusedStateDocs() { while (stateDocIdsIterator.hasNext()) { Deque stateDocIds = stateDocIdsIterator.next(); for (String stateDocId : stateDocIds) { - int modelStateSuffixIndex = stateDocId.indexOf("_model_state_"); - if (modelStateSuffixIndex < 0) { - // e.g. quantiles, etc. + String jobId = JobIdExtractor.extractJobId(stateDocId); + if (jobId == null) { + // not a managed state document id continue; } - String jobId = stateDocId.substring(0, modelStateSuffixIndex); if (jobIds.contains(jobId) == false) { deleteUnusedStateRequestBuilder.add(new DeleteRequest( AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId)); @@ -83,16 +93,16 @@ private Set getJobIds() { } private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener listener) { - LOGGER.info("Found {} unused model state documents; attempting to delete", + LOGGER.info("Found [{}] unused state documents; attempting to delete", deleteUnusedStateRequestBuilder.numberOfActions()); deleteUnusedStateRequestBuilder.execute(new ActionListener() { @Override public void onResponse(BulkResponse bulkItemResponses) { if (bulkItemResponses.hasFailures()) { - LOGGER.error("Some unused model state documents could not be deleted due to failures: {}", + LOGGER.error("Some unused state documents could not be deleted due to failures: {}", bulkItemResponses.buildFailureMessage()); } else { - LOGGER.info("Successfully deleted unused model state documents"); + LOGGER.info("Successfully deleted all unused state documents"); } listener.onResponse(true); } @@ -104,4 +114,21 @@ public void onFailure(Exception e) { } }); } + + private static class JobIdExtractor { + + private static List> extractors = Arrays.asList( + ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId); + + private static String extractJobId(String docId) { + String jobId; + for (Function extractor : extractors) { + jobId = extractor.apply(docId); + if (jobId != null) { + return jobId; + } + } + return null; + } + } } diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 62dbd1e33d8f4..23bd5c7f7ddf1 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -182,8 +182,8 @@ public void testDeleteExpiredData() throws Exception { BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < 10010; i++) { - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", - "non_existing_job_model_state_" + i); + String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId); indexRequest.source(Collections.emptyMap()); bulkRequestBuilder.add(indexRequest); } From 84f492329faac361c47127083effde35d1121fe3 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 17 May 2018 13:59:26 +0300 Subject: [PATCH 3/3] Uset lastIndexOf --- .../core/ml/job/process/autodetect/state/CategorizerState.java | 2 +- .../xpack/core/ml/job/process/autodetect/state/ModelState.java | 2 +- .../xpack/core/ml/job/process/autodetect/state/Quantiles.java | 2 +- .../ml/job/process/autodetect/state/CategorizerStateTests.java | 1 + .../core/ml/job/process/autodetect/state/ModelStateTests.java | 1 + .../core/ml/job/process/autodetect/state/QuantilesTests.java | 1 + 6 files changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java index c1ee6daca063f..2d68a6d7cf7a0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java @@ -43,7 +43,7 @@ public static final String v54DocumentPrefix(String jobId) { * @return the job id or {@code null} if the id is not valid */ public static final String extractJobId(String docId) { - int suffixIndex = docId.indexOf("_" + TYPE); + int suffixIndex = docId.lastIndexOf("_" + TYPE); return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java index 27d9fbdc6fc49..fbec7bb6c7291 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java @@ -35,7 +35,7 @@ public static final String v54DocumentId(String jobId, String snapshotId, int do * @return the job id or {@code null} if the id is not valid */ public static final String extractJobId(String docId) { - int suffixIndex = docId.indexOf("_" + TYPE + "_"); + int suffixIndex = docId.lastIndexOf("_" + TYPE + "_"); return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java index 8a7c4d9d2c5b0..0b3ddcc7b5197 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java @@ -66,7 +66,7 @@ public static String v54DocumentId(String jobId) { * @return the job id or {@code null} if the id is not valid */ public static final String extractJobId(String docId) { - int suffixIndex = docId.indexOf("_" + TYPE); + int suffixIndex = docId.lastIndexOf("_" + TYPE); return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java index df927bd699a0d..726288faffbc7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerStateTests.java @@ -17,6 +17,7 @@ public void testExtractJobId_GivenValidDocId() { assertThat(CategorizerState.extractJobId("foo_categorizer_state#1"), equalTo("foo")); assertThat(CategorizerState.extractJobId("bar_categorizer_state#2"), equalTo("bar")); assertThat(CategorizerState.extractJobId("foo_bar_categorizer_state#3"), equalTo("foo_bar")); + assertThat(CategorizerState.extractJobId("_categorizer_state_categorizer_state#3"), equalTo("_categorizer_state")); } public void testExtractJobId_GivenInvalidDocId() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java index 70b1615cf0706..0e42a06111931 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelStateTests.java @@ -17,6 +17,7 @@ public void testExtractJobId_GivenValidDocId() { assertThat(ModelState.extractJobId("foo_model_state_3151373783#1"), equalTo("foo")); assertThat(ModelState.extractJobId("bar_model_state_451515#3"), equalTo("bar")); assertThat(ModelState.extractJobId("foo_bar_model_state_blah_blah"), equalTo("foo_bar")); + assertThat(ModelState.extractJobId("_model_state_model_state_11111"), equalTo("_model_state")); } public void testExtractJobId_GivenInvalidDocId() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java index b97524d7fd8e8..146e3ed5bd539 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java @@ -25,6 +25,7 @@ public void testExtractJobId_GivenValidDocId() { assertThat(Quantiles.extractJobId("foo_quantiles"), equalTo("foo")); assertThat(Quantiles.extractJobId("bar_quantiles"), equalTo("bar")); assertThat(Quantiles.extractJobId("foo_bar_quantiles"), equalTo("foo_bar")); + assertThat(Quantiles.extractJobId("_quantiles_quantiles"), equalTo("_quantiles")); } public void testExtractJobId_GivenInvalidDocId() {