Skip to content

Commit 0fd27d4

Browse files
[ML] Unused state remover should also account for jobs in index (#37119)
The unused state remover was never adjusted to account for jobs stored in the config index. The result was that when triggered it removed state for all jobs stored in the config index. This commit fixes the issue. Closes #37109
1 parent bfe6f09 commit 0fd27d4

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import static org.hamcrest.Matchers.greaterThan;
4444
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4545
import static org.hamcrest.Matchers.is;
46-
import static org.hamcrest.Matchers.lessThan;
4746
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4847

4948
public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
@@ -244,7 +243,10 @@ public void testDeleteExpiredData() throws Exception {
244243
.setFetchSource(false)
245244
.setSize(10000)
246245
.get();
247-
assertThat(stateDocsResponse.getHits().getTotalHits().value, lessThan(10000L));
246+
247+
// Assert at least one state doc for each job
248+
assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L));
249+
248250
for (SearchHit hit : stateDocsResponse.getHits().getHits()) {
249251
assertThat(hit.getId().startsWith("non_existing_job"), is(false));
250252
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@
1414
import org.elasticsearch.client.Client;
1515
import org.elasticsearch.cluster.service.ClusterService;
1616
import org.elasticsearch.xpack.core.ml.MlMetadata;
17+
import org.elasticsearch.xpack.core.ml.job.config.Job;
1718
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
1819
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
1920
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
2021
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
2122
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
23+
import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
2224
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
2325

2426
import java.util.Arrays;
2527
import java.util.Deque;
28+
import java.util.HashSet;
2629
import java.util.List;
2730
import java.util.Objects;
2831
import java.util.Set;
@@ -81,7 +84,18 @@ private BulkRequestBuilder findUnusedStateDocs() {
8184
}
8285

8386
private Set<String> getJobIds() {
84-
return MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet();
87+
Set<String> jobIds = new HashSet<>();
88+
89+
// TODO Once at 8.0, we can stop searching for jobs in cluster state
90+
// and remove cluster service as a member all together.
91+
jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet());
92+
93+
BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
94+
while (jobsIterator.hasNext()) {
95+
Deque<Job.Builder> jobs = jobsIterator.next();
96+
jobs.stream().map(Job.Builder::getId).forEach(jobIds::add);
97+
}
98+
return jobIds;
8599
}
86100

87101
private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {

0 commit comments

Comments
 (0)