From 8377956a797d0da09da32e335089a040bdd44ec6 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 3 Jan 2019 17:25:15 +0200 Subject: [PATCH 1/2] [ML] Unused state remover should also account for jobs in index The unused state remover was never adjusted to account for jobs stored in the config index. The result was that when triggered it removed state for all jobs stored in the config index. This commit fixes the issue. Closes #37109 --- .../xpack/ml/integration/DeleteExpiredDataIT.java | 5 ++++- .../xpack/ml/job/retention/UnusedStateRemover.java | 12 +++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 0f860043b67b0..24cd5c0e6d98c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -245,7 +245,10 @@ public void testDeleteExpiredData() throws Exception { .setFetchSource(false) .setSize(10000) .get(); - assertThat(stateDocsResponse.getHits().getTotalHits(), lessThan(10000L)); + + // Assert at least one state doc for each job + assertThat(stateDocsResponse.getHits().getTotalHits(), greaterThanOrEqualTo(5L)); + for (SearchHit hit : stateDocsResponse.getHits().getHits()) { assertThat(hit.getId().startsWith("non_existing_job"), is(false)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index ea094dfe6b4f1..7a5cd0b044ab7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -14,15 +14,18 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator; import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator; import java.util.Arrays; import java.util.Deque; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; @@ -81,7 +84,14 @@ private BulkRequestBuilder findUnusedStateDocs() { } private Set getJobIds() { - return MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet(); + Set jobIds = new HashSet<>(); + jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet()); + BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName()); + while (jobsIterator.hasNext()) { + Deque jobs = jobsIterator.next(); + jobs.stream().map(Job.Builder::getId).forEach(jobIds::add); + } + return jobIds; } private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener listener) { From 57248e5350c59f1ff063912a10f6ce3b84540fb6 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Thu, 3 Jan 2019 17:58:57 +0200 Subject: [PATCH 2/2] Adds TODO and removes unused import --- .../xpack/ml/integration/DeleteExpiredDataIT.java | 1 - .../xpack/ml/job/retention/UnusedStateRemover.java | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 24cd5c0e6d98c..2938ced6cac44 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -43,7 +43,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 7a5cd0b044ab7..66030c56823c9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -85,7 +85,11 @@ private BulkRequestBuilder findUnusedStateDocs() { private Set getJobIds() { Set jobIds = new HashSet<>(); + + // TODO Once at 8.0, we can stop searching for jobs in cluster state + // and remove cluster service as a member all together. jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet()); + BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName()); while (jobsIterator.hasNext()) { Deque jobs = jobsIterator.next();