Skip to content

Commit ec98d15

Browse files
[ML] Clean left behind model state docs (#30659)
It is possible for state documents to be left behind in the state index. This may be because of bugs or uncontrollable scenarios. In any case, those documents may take up quite some disk space when they add up. This commit adds a step in the expired data deletion that is part of the daily maintenance service. The new step searches for state documents that do not belong to any of the current jobs and deletes them. Closes #30551
1 parent 9831b59 commit ec98d15

File tree

11 files changed

+324
-2
lines changed

11 files changed

+324
-2
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/CategorizerState.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ public static final String v54DocumentPrefix(String jobId) {
3737
return jobId + "#";
3838
}
3939

40+
/**
41+
* Given the id of a categorizer state document it extracts the job id
42+
* @param docId the categorizer state document id
43+
* @return the job id or {@code null} if the id is not valid
44+
*/
45+
public static final String extractJobId(String docId) {
46+
int suffixIndex = docId.lastIndexOf("_" + TYPE);
47+
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
48+
}
49+
4050
private CategorizerState() {
4151
}
4252
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/ModelState.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@ public static final String v54DocumentId(String jobId, String snapshotId, int do
2929
return jobId + "-" + snapshotId + "#" + docNum;
3030
}
3131

32+
/**
33+
* Given the id of a state document it extracts the job id
34+
* @param docId the state document id
35+
* @return the job id or {@code null} if the id is not valid
36+
*/
37+
public static final String extractJobId(String docId) {
38+
int suffixIndex = docId.lastIndexOf("_" + TYPE + "_");
39+
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
40+
}
41+
3242
private ModelState() {
3343
}
3444
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/Quantiles.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ public static String v54DocumentId(String jobId) {
6060
return jobId + "-" + TYPE;
6161
}
6262

63+
/**
64+
* Given the id of a quantiles document it extracts the job id
65+
* @param docId the quantiles document id
66+
* @return the job id or {@code null} if the id is not valid
67+
*/
68+
public static final String extractJobId(String docId) {
69+
int suffixIndex = docId.lastIndexOf("_" + TYPE);
70+
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
71+
}
72+
6373
private final String jobId;
6474
private final Date timestamp;
6575
private final String quantileState;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
7+
8+
import org.elasticsearch.test.ESTestCase;
9+
10+
import static org.hamcrest.Matchers.equalTo;
11+
import static org.hamcrest.Matchers.nullValue;
12+
import static org.hamcrest.core.Is.is;
13+
14+
public class CategorizerStateTests extends ESTestCase {
15+
16+
public void testExtractJobId_GivenValidDocId() {
17+
assertThat(CategorizerState.extractJobId("foo_categorizer_state#1"), equalTo("foo"));
18+
assertThat(CategorizerState.extractJobId("bar_categorizer_state#2"), equalTo("bar"));
19+
assertThat(CategorizerState.extractJobId("foo_bar_categorizer_state#3"), equalTo("foo_bar"));
20+
assertThat(CategorizerState.extractJobId("_categorizer_state_categorizer_state#3"), equalTo("_categorizer_state"));
21+
}
22+
23+
public void testExtractJobId_GivenInvalidDocId() {
24+
assertThat(CategorizerState.extractJobId(""), is(nullValue()));
25+
assertThat(CategorizerState.extractJobId("foo"), is(nullValue()));
26+
assertThat(CategorizerState.extractJobId("_categorizer_state"), is(nullValue()));
27+
assertThat(CategorizerState.extractJobId("foo_model_state_3141341341"), is(nullValue()));
28+
}
29+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
7+
8+
import org.elasticsearch.test.ESTestCase;
9+
10+
import static org.hamcrest.Matchers.equalTo;
11+
import static org.hamcrest.Matchers.nullValue;
12+
import static org.hamcrest.core.Is.is;
13+
14+
public class ModelStateTests extends ESTestCase {
15+
16+
public void testExtractJobId_GivenValidDocId() {
17+
assertThat(ModelState.extractJobId("foo_model_state_3151373783#1"), equalTo("foo"));
18+
assertThat(ModelState.extractJobId("bar_model_state_451515#3"), equalTo("bar"));
19+
assertThat(ModelState.extractJobId("foo_bar_model_state_blah_blah"), equalTo("foo_bar"));
20+
assertThat(ModelState.extractJobId("_model_state_model_state_11111"), equalTo("_model_state"));
21+
}
22+
23+
public void testExtractJobId_GivenInvalidDocId() {
24+
assertThat(ModelState.extractJobId(""), is(nullValue()));
25+
assertThat(ModelState.extractJobId("foo"), is(nullValue()));
26+
assertThat(ModelState.extractJobId("_model_3141341341"), is(nullValue()));
27+
assertThat(ModelState.extractJobId("_state_3141341341"), is(nullValue()));
28+
assertThat(ModelState.extractJobId("_model_state_3141341341"), is(nullValue()));
29+
assertThat(ModelState.extractJobId("foo_quantiles"), is(nullValue()));
30+
}
31+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/QuantilesTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,26 @@
1515
import java.util.Date;
1616

1717
import static org.hamcrest.Matchers.containsString;
18+
import static org.hamcrest.Matchers.equalTo;
19+
import static org.hamcrest.Matchers.nullValue;
20+
import static org.hamcrest.core.Is.is;
1821

1922
public class QuantilesTests extends AbstractSerializingTestCase<Quantiles> {
2023

24+
public void testExtractJobId_GivenValidDocId() {
25+
assertThat(Quantiles.extractJobId("foo_quantiles"), equalTo("foo"));
26+
assertThat(Quantiles.extractJobId("bar_quantiles"), equalTo("bar"));
27+
assertThat(Quantiles.extractJobId("foo_bar_quantiles"), equalTo("foo_bar"));
28+
assertThat(Quantiles.extractJobId("_quantiles_quantiles"), equalTo("_quantiles"));
29+
}
30+
31+
public void testExtractJobId_GivenInvalidDocId() {
32+
assertThat(Quantiles.extractJobId(""), is(nullValue()));
33+
assertThat(Quantiles.extractJobId("foo"), is(nullValue()));
34+
assertThat(Quantiles.extractJobId("_quantiles"), is(nullValue()));
35+
assertThat(Quantiles.extractJobId("foo_model_state_3141341341"), is(nullValue()));
36+
}
37+
2138
public void testEquals_GivenSameObject() {
2239
Quantiles quantiles = new Quantiles("foo", new Date(0L), "foo");
2340
assertTrue(quantiles.equals(quantiles));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
2323
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
2424
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
25+
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
2526
import org.elasticsearch.xpack.ml.notifications.Auditor;
2627
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
2728

@@ -56,7 +57,8 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
5657
List<MlDataRemover> dataRemovers = Arrays.asList(
5758
new ExpiredResultsRemover(client, clusterService, auditor),
5859
new ExpiredForecastsRemover(client),
59-
new ExpiredModelSnapshotsRemover(client, clusterService)
60+
new ExpiredModelSnapshotsRemover(client, clusterService),
61+
new UnusedStateRemover(client, clusterService)
6062
);
6163
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
6264
deleteExpiredData(dataRemoversIterator, listener);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ private SearchResponse initScroll() {
9797
searchRequest.source(new SearchSourceBuilder()
9898
.size(BATCH_SIZE)
9999
.query(getQuery())
100+
.fetchSource(shouldFetchSource())
100101
.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)));
101102

102103
SearchResponse searchResponse = client.search(searchRequest).actionGet();
@@ -123,6 +124,14 @@ private Deque<T> mapHits(SearchResponse searchResponse) {
123124
return results;
124125
}
125126

127+
/**
128+
* Should fetch source? Defaults to {@code true}
129+
* @return whether the source should be fetched
130+
*/
131+
protected boolean shouldFetchSource() {
132+
return true;
133+
}
134+
126135
/**
127136
* Get the query to use for the search
128137
* @return the search query
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.job.persistence;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.index.query.QueryBuilder;
10+
import org.elasticsearch.index.query.QueryBuilders;
11+
import org.elasticsearch.search.SearchHit;
12+
13+
/**
14+
* Iterates through the state doc ids
15+
*/
16+
public class BatchedStateDocIdsIterator extends BatchedDocumentsIterator<String> {
17+
18+
public BatchedStateDocIdsIterator(Client client, String index) {
19+
super(client, index);
20+
}
21+
22+
@Override
23+
protected boolean shouldFetchSource() {
24+
return false;
25+
}
26+
27+
@Override
28+
protected QueryBuilder getQuery() {
29+
return QueryBuilders.matchAllQuery();
30+
}
31+
32+
@Override
33+
protected String map(SearchHit hit) {
34+
return hit.getId();
35+
}
36+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml.job.retention;
7+
8+
import org.apache.logging.log4j.Logger;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
11+
import org.elasticsearch.action.bulk.BulkResponse;
12+
import org.elasticsearch.action.delete.DeleteRequest;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.common.logging.Loggers;
17+
import org.elasticsearch.xpack.core.ml.MLMetadataField;
18+
import org.elasticsearch.xpack.core.ml.MlMetadata;
19+
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
20+
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
21+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
22+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
23+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
24+
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
25+
26+
import java.util.Arrays;
27+
import java.util.Collections;
28+
import java.util.Deque;
29+
import java.util.List;
30+
import java.util.Objects;
31+
import java.util.Set;
32+
import java.util.function.Function;
33+
34+
/**
35+
* If for any reason a job is deleted by some of its state documents
36+
* are left behind, this class deletes any unused documents stored
37+
* in the .ml-state index.
38+
*/
39+
public class UnusedStateRemover implements MlDataRemover {
40+
41+
private static final Logger LOGGER = Loggers.getLogger(UnusedStateRemover.class);
42+
43+
private final Client client;
44+
private final ClusterService clusterService;
45+
46+
public UnusedStateRemover(Client client, ClusterService clusterService) {
47+
this.client = Objects.requireNonNull(client);
48+
this.clusterService = Objects.requireNonNull(clusterService);
49+
}
50+
51+
@Override
52+
public void remove(ActionListener<Boolean> listener) {
53+
try {
54+
BulkRequestBuilder deleteUnusedStateRequestBuilder = findUnusedStateDocs();
55+
if (deleteUnusedStateRequestBuilder.numberOfActions() > 0) {
56+
executeDeleteUnusedStateDocs(deleteUnusedStateRequestBuilder, listener);
57+
} else {
58+
listener.onResponse(true);
59+
}
60+
} catch (Exception e) {
61+
listener.onFailure(e);
62+
}
63+
}
64+
65+
private BulkRequestBuilder findUnusedStateDocs() {
66+
Set<String> jobIds = getJobIds();
67+
BulkRequestBuilder deleteUnusedStateRequestBuilder = client.prepareBulk();
68+
BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, AnomalyDetectorsIndex.jobStateIndexName());
69+
while (stateDocIdsIterator.hasNext()) {
70+
Deque<String> stateDocIds = stateDocIdsIterator.next();
71+
for (String stateDocId : stateDocIds) {
72+
String jobId = JobIdExtractor.extractJobId(stateDocId);
73+
if (jobId == null) {
74+
// not a managed state document id
75+
continue;
76+
}
77+
if (jobIds.contains(jobId) == false) {
78+
deleteUnusedStateRequestBuilder.add(new DeleteRequest(
79+
AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId));
80+
}
81+
}
82+
}
83+
return deleteUnusedStateRequestBuilder;
84+
}
85+
86+
private Set<String> getJobIds() {
87+
ClusterState clusterState = clusterService.state();
88+
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
89+
if (mlMetadata != null) {
90+
return mlMetadata.getJobs().keySet();
91+
}
92+
return Collections.emptySet();
93+
}
94+
95+
private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {
96+
LOGGER.info("Found [{}] unused state documents; attempting to delete",
97+
deleteUnusedStateRequestBuilder.numberOfActions());
98+
deleteUnusedStateRequestBuilder.execute(new ActionListener<BulkResponse>() {
99+
@Override
100+
public void onResponse(BulkResponse bulkItemResponses) {
101+
if (bulkItemResponses.hasFailures()) {
102+
LOGGER.error("Some unused state documents could not be deleted due to failures: {}",
103+
bulkItemResponses.buildFailureMessage());
104+
} else {
105+
LOGGER.info("Successfully deleted all unused state documents");
106+
}
107+
listener.onResponse(true);
108+
}
109+
110+
@Override
111+
public void onFailure(Exception e) {
112+
LOGGER.error("Error deleting unused model state documents: ", e);
113+
listener.onFailure(e);
114+
}
115+
});
116+
}
117+
118+
private static class JobIdExtractor {
119+
120+
private static List<Function<String, String>> extractors = Arrays.asList(
121+
ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId);
122+
123+
private static String extractJobId(String docId) {
124+
String jobId;
125+
for (Function<String, String> extractor : extractors) {
126+
jobId = extractor.apply(docId);
127+
if (jobId != null) {
128+
return jobId;
129+
}
130+
}
131+
return null;
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)