Skip to content

Commit 8d2b5ec

Browse files
[ML] Delete unused data frame analytics state
This commit adds removal of unused data frame analytics state from the _delete_expired_data API (and in extend th ML daily maintenance task). At the moment the potential state docs include the progress document and state for regression and classification analyses.
1 parent 9dec996 commit 8d2b5ec

File tree

23 files changed

+221
-17
lines changed

23 files changed

+221
-17
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.core.ml.dataframe;
77

88
import org.elasticsearch.Version;
9+
import org.elasticsearch.common.Nullable;
910
import org.elasticsearch.common.ParseField;
1011
import org.elasticsearch.common.Strings;
1112
import org.elasticsearch.common.io.stream.StreamInput;
@@ -310,6 +311,15 @@ public static String documentId(String id) {
310311
return TYPE + "-" + id;
311312
}
312313

314+
/**
315+
* Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
316+
*/
317+
@Nullable
318+
public static String extractJobIdFromDocId(String docId) {
319+
String jobId = docId.replaceAll("^" + TYPE +"-", "");
320+
return jobId.equals(docId) ? null : jobId;
321+
}
322+
313323
public static class Builder {
314324

315325
private String id;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class Classification implements DataFrameAnalysis {
3939
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
4040
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
4141

42+
private static final String STATE_DOC_ID_SUFFIX = "_classification_state#1";
43+
4244
private static final ConstructingObjectParser<Classification, Void> LENIENT_PARSER = createParser(true);
4345
private static final ConstructingObjectParser<Classification, Void> STRICT_PARSER = createParser(false);
4446

@@ -256,7 +258,12 @@ public boolean persistsState() {
256258

257259
@Override
258260
public String getStateDocId(String jobId) {
259-
return jobId + "_classification_state#1";
261+
return jobId + STATE_DOC_ID_SUFFIX;
262+
}
263+
264+
public static String extractJobIdFromStateDoc(String stateDocId) {
265+
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
266+
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
260267
}
261268

262269
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class Regression implements DataFrameAnalysis {
3636
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
3737
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
3838

39+
private static final String STATE_DOC_ID_SUFFIX = "_regression_state#1";
40+
3941
private static final ConstructingObjectParser<Regression, Void> LENIENT_PARSER = createParser(true);
4042
private static final ConstructingObjectParser<Regression, Void> STRICT_PARSER = createParser(false);
4143

@@ -196,7 +198,12 @@ public boolean persistsState() {
196198

197199
@Override
198200
public String getStateDocId(String jobId) {
199-
return jobId + "_regression_state#1";
201+
return jobId + STATE_DOC_ID_SUFFIX;
202+
}
203+
204+
public static String extractJobIdFromStateDoc(String stateDocId) {
205+
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
206+
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
200207
}
201208

202209
@Override

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import static org.hamcrest.Matchers.is;
5454
import static org.hamcrest.Matchers.not;
5555
import static org.hamcrest.Matchers.notNullValue;
56+
import static org.hamcrest.Matchers.nullValue;
5657
import static org.hamcrest.Matchers.startsWith;
5758

5859
public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<DataFrameAnalyticsConfig> {
@@ -384,6 +385,13 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsBeforeItWas
384385
}
385386
}
386387

388+
public void testExtractJobIdFromDocId() {
389+
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-foo"), equalTo("foo"));
390+
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-data_frame_analytics_config-foo"),
391+
equalTo("data_frame_analytics_config-foo"));
392+
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
393+
}
394+
387395
private static void assertTooSmall(ElasticsearchStatusException e) {
388396
assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
389397
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,9 @@ public void testGetStateDocId() {
216216
String randomId = randomAlphaOfLength(10);
217217
assertThat(classification.getStateDocId(randomId), equalTo(randomId + "_classification_state#1"));
218218
}
219+
220+
public void testExtractJobIdFromStateDoc() {
221+
assertThat(Classification.extractJobIdFromStateDoc("foo_bar-1_classification_state#1"), equalTo("foo_bar-1"));
222+
assertThat(Classification.extractJobIdFromStateDoc("noop"), is(nullValue()));
223+
}
219224
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public void testGetStateDocId() {
110110
assertThat(regression.getStateDocId(randomId), equalTo(randomId + "_regression_state#1"));
111111
}
112112

113+
public void testExtractJobIdFromStateDoc() {
114+
assertThat(Regression.extractJobIdFromStateDoc("foo_bar-1_regression_state#1"), equalTo("foo_bar-1"));
115+
assertThat(Regression.extractJobIdFromStateDoc("noop"), is(nullValue()));
116+
}
117+
113118
public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {
114119
Regression regression = createRandom();
115120
assertThat(regression.getRandomizeSeed(), is(notNullValue()));

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@
1010
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1111
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1212
import org.elasticsearch.action.bulk.BulkResponse;
13+
import org.elasticsearch.action.delete.DeleteResponse;
1314
import org.elasticsearch.action.get.GetResponse;
1415
import org.elasticsearch.action.index.IndexAction;
1516
import org.elasticsearch.action.index.IndexRequest;
1617
import org.elasticsearch.action.search.SearchResponse;
1718
import org.elasticsearch.action.support.WriteRequest;
1819
import org.elasticsearch.index.query.QueryBuilder;
1920
import org.elasticsearch.index.query.QueryBuilders;
21+
import org.elasticsearch.rest.RestStatus;
2022
import org.elasticsearch.search.SearchHit;
23+
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
2124
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
2225
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
2326
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
@@ -315,6 +318,36 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
315318
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
316319
}
317320

321+
public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
322+
initialize("classification_delete_expired_data");
323+
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
324+
325+
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
326+
registerAnalytics(config);
327+
putAnalytics(config);
328+
startAnalytics(jobId);
329+
waitUntilAnalyticsIsStopped(jobId);
330+
331+
assertProgress(jobId, 100, 100, 100, 100);
332+
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
333+
assertModelStatePersisted(stateDocId());
334+
assertInferenceModelPersisted(jobId);
335+
336+
// Delete the config straight from the config index
337+
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
338+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
339+
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
340+
341+
DeleteExpiredDataAction.Response deleteExpiredDataResponse = client()
342+
.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).actionGet();
343+
assertThat(deleteExpiredDataResponse.isDeleted(), is(true));
344+
345+
client().admin().indices().refresh(new RefreshRequest(".ml-state")).actionGet();
346+
347+
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
348+
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
349+
}
350+
318351
private void initialize(String jobId) {
319352
this.jobId = jobId;
320353
this.sourceIndex = jobId + "_source_index";

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void tearDownData() {
8787
cleanUp();
8888
}
8989

90-
public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
90+
public void testDeleteExpiredData_GivenNothingToDelete() throws Exception {
9191
// Tests that nothing goes wrong when there's nothing to delete
9292
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
9393
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
4141
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
4242
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
43-
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
43+
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
4444
import org.hamcrest.Matcher;
4545
import org.hamcrest.Matchers;
4646

@@ -205,7 +205,7 @@ protected void assertProgress(String id, int reindexing, int loadingData, int an
205205
}
206206

207207
protected SearchResponse searchStoredProgress(String jobId) {
208-
String docId = DataFrameAnalyticsTask.progressDocId(jobId);
208+
String docId = StoredProgress.documentId(jobId);
209209
return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
210210
.setQuery(QueryBuilders.idsQuery().addIds(docId))
211211
.get();

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55
*/
66
package org.elasticsearch.xpack.ml.integration;
77

8+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
89
import org.elasticsearch.action.bulk.BulkRequestBuilder;
910
import org.elasticsearch.action.bulk.BulkResponse;
11+
import org.elasticsearch.action.delete.DeleteResponse;
1012
import org.elasticsearch.action.get.GetResponse;
1113
import org.elasticsearch.action.index.IndexRequest;
1214
import org.elasticsearch.action.search.SearchResponse;
1315
import org.elasticsearch.action.support.WriteRequest;
1416
import org.elasticsearch.common.unit.TimeValue;
17+
import org.elasticsearch.rest.RestStatus;
1518
import org.elasticsearch.search.SearchHit;
19+
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
1620
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
1721
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
1822
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
@@ -272,6 +276,36 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
272276
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
273277
}
274278

279+
public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
280+
initialize("regression_delete_expired_data");
281+
indexData(sourceIndex, 100, 0);
282+
283+
DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
284+
registerAnalytics(config);
285+
putAnalytics(config);
286+
startAnalytics(jobId);
287+
waitUntilAnalyticsIsStopped(jobId);
288+
289+
assertProgress(jobId, 100, 100, 100, 100);
290+
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
291+
assertModelStatePersisted(stateDocId());
292+
assertInferenceModelPersisted(jobId);
293+
294+
// Delete the config straight from the config index
295+
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
296+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
297+
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
298+
299+
DeleteExpiredDataAction.Response deleteExpiredDataResponse = client()
300+
.execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).actionGet();
301+
assertThat(deleteExpiredDataResponse.isDeleted(), is(true));
302+
303+
client().admin().indices().refresh(new RefreshRequest(".ml-state")).actionGet();
304+
305+
SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
306+
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
307+
}
308+
275309
private void initialize(String jobId) {
276310
this.jobId = jobId;
277311
this.sourceIndex = jobId + "_source_index";

0 commit comments

Comments
 (0)