Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ public static final String v54DocumentPrefix(String jobId) {
return jobId + "#";
}

/**
* Given the id of a categorizer state document it extracts the job id
* @param docId the categorizer state document id
* @return the job id or {@code null} if the id is not valid
*/
public static final String extractJobId(String docId) {
int suffixIndex = docId.lastIndexOf("_" + TYPE);
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
}

private CategorizerState() {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public static final String v54DocumentId(String jobId, String snapshotId, int do
return jobId + "-" + snapshotId + "#" + docNum;
}

/**
* Given the id of a state document it extracts the job id
* @param docId the state document id
* @return the job id or {@code null} if the id is not valid
*/
public static final String extractJobId(String docId) {
int suffixIndex = docId.lastIndexOf("_" + TYPE + "_");
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
}

private ModelState() {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public static String v54DocumentId(String jobId) {
return jobId + "-" + TYPE;
}

/**
* Given the id of a quantiles document it extracts the job id
* @param docId the quantiles document id
* @return the job id or {@code null} if the id is not valid
*/
public static final String extractJobId(String docId) {
int suffixIndex = docId.lastIndexOf("_" + TYPE);
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
}

private final String jobId;
private final Date timestamp;
private final String quantileState;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;

import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;

public class CategorizerStateTests extends ESTestCase {

public void testExtractJobId_GivenValidDocId() {
assertThat(CategorizerState.extractJobId("foo_categorizer_state#1"), equalTo("foo"));
assertThat(CategorizerState.extractJobId("bar_categorizer_state#2"), equalTo("bar"));
assertThat(CategorizerState.extractJobId("foo_bar_categorizer_state#3"), equalTo("foo_bar"));
assertThat(CategorizerState.extractJobId("_categorizer_state_categorizer_state#3"), equalTo("_categorizer_state"));
}

public void testExtractJobId_GivenInvalidDocId() {
assertThat(CategorizerState.extractJobId(""), is(nullValue()));
assertThat(CategorizerState.extractJobId("foo"), is(nullValue()));
assertThat(CategorizerState.extractJobId("_categorizer_state"), is(nullValue()));
assertThat(CategorizerState.extractJobId("foo_model_state_3141341341"), is(nullValue()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;

import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;

public class ModelStateTests extends ESTestCase {

public void testExtractJobId_GivenValidDocId() {
assertThat(ModelState.extractJobId("foo_model_state_3151373783#1"), equalTo("foo"));
assertThat(ModelState.extractJobId("bar_model_state_451515#3"), equalTo("bar"));
assertThat(ModelState.extractJobId("foo_bar_model_state_blah_blah"), equalTo("foo_bar"));
assertThat(ModelState.extractJobId("_model_state_model_state_11111"), equalTo("_model_state"));
}

public void testExtractJobId_GivenInvalidDocId() {
assertThat(ModelState.extractJobId(""), is(nullValue()));
assertThat(ModelState.extractJobId("foo"), is(nullValue()));
assertThat(ModelState.extractJobId("_model_3141341341"), is(nullValue()));
assertThat(ModelState.extractJobId("_state_3141341341"), is(nullValue()));
assertThat(ModelState.extractJobId("_model_state_3141341341"), is(nullValue()));
assertThat(ModelState.extractJobId("foo_quantiles"), is(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,26 @@
import java.util.Date;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;

public class QuantilesTests extends AbstractSerializingTestCase<Quantiles> {

public void testExtractJobId_GivenValidDocId() {
assertThat(Quantiles.extractJobId("foo_quantiles"), equalTo("foo"));
assertThat(Quantiles.extractJobId("bar_quantiles"), equalTo("bar"));
assertThat(Quantiles.extractJobId("foo_bar_quantiles"), equalTo("foo_bar"));
assertThat(Quantiles.extractJobId("_quantiles_quantiles"), equalTo("_quantiles"));
}

public void testExtractJobId_GivenInvalidDocId() {
assertThat(Quantiles.extractJobId(""), is(nullValue()));
assertThat(Quantiles.extractJobId("foo"), is(nullValue()));
assertThat(Quantiles.extractJobId("_quantiles"), is(nullValue()));
assertThat(Quantiles.extractJobId("foo_model_state_3141341341"), is(nullValue()));
}

public void testEquals_GivenSameObject() {
Quantiles quantiles = new Quantiles("foo", new Date(0L), "foo");
assertTrue(quantiles.equals(quantiles));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;

Expand Down Expand Up @@ -56,7 +57,8 @@ private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response>
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client),
new ExpiredModelSnapshotsRemover(client, clusterService)
new ExpiredModelSnapshotsRemover(client, clusterService),
new UnusedStateRemover(client, clusterService)
);
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private SearchResponse initScroll() {
searchRequest.source(new SearchSourceBuilder()
.size(BATCH_SIZE)
.query(getQuery())
.fetchSource(shouldFetchSource())
.sort(SortBuilders.fieldSort(ElasticsearchMappings.ES_DOC)));

SearchResponse searchResponse = client.search(searchRequest).actionGet();
Expand All @@ -123,6 +124,14 @@ private Deque<T> mapHits(SearchResponse searchResponse) {
return results;
}

/**
* Should fetch source? Defaults to {@code true}
* @return whether the source should be fetched
*/
protected boolean shouldFetchSource() {
return true;
}

/**
* Get the query to use for the search
* @return the search query
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;

import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;

/**
* Iterates through the state doc ids
*/
public class BatchedStateDocIdsIterator extends BatchedDocumentsIterator<String> {

public BatchedStateDocIdsIterator(Client client, String index) {
super(client, index);
}

@Override
protected boolean shouldFetchSource() {
return false;
}

@Override
protected QueryBuilder getQuery() {
return QueryBuilders.matchAllQuery();
}

@Override
protected String map(SearchHit hit) {
return hit.getId();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.retention;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;

import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;

/**
* If for any reason a job is deleted by some of its state documents
* are left behind, this class deletes any unused documents stored
* in the .ml-state index.
*/
public class UnusedStateRemover implements MlDataRemover {

private static final Logger LOGGER = Loggers.getLogger(UnusedStateRemover.class);

private final Client client;
private final ClusterService clusterService;

public UnusedStateRemover(Client client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
}

@Override
public void remove(ActionListener<Boolean> listener) {
try {
BulkRequestBuilder deleteUnusedStateRequestBuilder = findUnusedStateDocs();
if (deleteUnusedStateRequestBuilder.numberOfActions() > 0) {
executeDeleteUnusedStateDocs(deleteUnusedStateRequestBuilder, listener);
} else {
listener.onResponse(true);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

private BulkRequestBuilder findUnusedStateDocs() {
Set<String> jobIds = getJobIds();
BulkRequestBuilder deleteUnusedStateRequestBuilder = client.prepareBulk();
BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, AnomalyDetectorsIndex.jobStateIndexName());
while (stateDocIdsIterator.hasNext()) {
Deque<String> stateDocIds = stateDocIdsIterator.next();
for (String stateDocId : stateDocIds) {
String jobId = JobIdExtractor.extractJobId(stateDocId);
if (jobId == null) {
// not a managed state document id
continue;
}
if (jobIds.contains(jobId) == false) {
deleteUnusedStateRequestBuilder.add(new DeleteRequest(
AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId));
}
}
}
return deleteUnusedStateRequestBuilder;
}

private Set<String> getJobIds() {
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata != null) {
return mlMetadata.getJobs().keySet();
}
return Collections.emptySet();
}

private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {
LOGGER.info("Found [{}] unused state documents; attempting to delete",
deleteUnusedStateRequestBuilder.numberOfActions());
deleteUnusedStateRequestBuilder.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (bulkItemResponses.hasFailures()) {
LOGGER.error("Some unused state documents could not be deleted due to failures: {}",
bulkItemResponses.buildFailureMessage());
} else {
LOGGER.info("Successfully deleted all unused state documents");
}
listener.onResponse(true);
}

@Override
public void onFailure(Exception e) {
LOGGER.error("Error deleting unused model state documents: ", e);
listener.onFailure(e);
}
});
}

private static class JobIdExtractor {

private static List<Function<String, String>> extractors = Arrays.asList(
ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId);

private static String extractJobId(String docId) {
String jobId;
for (Function<String, String> extractor : extractors) {
jobId = extractor.apply(docId);
if (jobId != null) {
return jobId;
}
}
return null;
}
}
}
Loading