Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*/
public final class AnomalyDetectorsIndex {

public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000;

private AnomalyDetectorsIndex() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
// least possible burden on Elasticsearch
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting))
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(),
AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW))
.version(Version.CURRENT.id)
.putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -58,23 +57,23 @@ public void clusterChanged(ClusterChangedEvent event) {
return;
}

if (event.metaDataChanged() == false) {
return;
}
PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event));
}
));
}

private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous,
ClusterState state) {
private void auditChangesToMlTasks(ClusterChangedEvent event) {

if (event.metaDataChanged() == false) {
return;
}

PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

if (Objects.equals(previous, current)) {
return;
Expand All @@ -92,7 +91,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis
if (currentAssignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
} else {
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
Expand All @@ -106,7 +105,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis
auditor.warning(jobId, msg);
}
} else {
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
if (jobId != null) {
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;

/**
* Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds)
Expand All @@ -37,10 +39,12 @@ private void setConfigMigrationEnabled(boolean configMigrationEnabled) {
this.isConfigMigrationEnabled = configMigrationEnabled;
}


/**
* Can migration start? Returns:
* False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION}
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the .ml-config index shards are not active
* True otherwise
* @param clusterState The cluster state
* @return A boolean that dictates if config migration can start
Expand All @@ -54,12 +58,26 @@ public boolean canStartMigration(ClusterState clusterState) {
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

return mlConfigIndexIsAllocated(clusterState);
}

static boolean mlConfigIndexIsAllocated(ClusterState clusterState) {
if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
return false;
}

IndexRoutingTable routingTable = clusterState.getRoutingTable().index(AnomalyDetectorsIndex.configIndexName());
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
return false;
}
return true;
}

/**
* Is the job a eligible for migration? Returns:
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
Expand All @@ -21,6 +23,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -29,6 +32,7 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand Down Expand Up @@ -126,19 +130,11 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster
* @param listener The success listener
*/
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {

if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
listener.onResponse(false);
return;
}

if (migrationInProgress.compareAndSet(false, true) == false) {
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
response -> {
migrationInProgress.set(false);
Expand All @@ -150,19 +146,34 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}

if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) {
createConfigIndex(ActionListener.wrap(
response -> {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
},
unMarkMigrationInProgress::onFailure
));
return;
}

if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}

snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);

List<JobsAndDatafeeds> batches = splitInBatches(clusterState);
if (batches.isEmpty()) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
response -> {
// We have successfully snapshotted the ML configs so we don't need to try again
tookConfigSnapshot.set(true);
migrateBatches(batches, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
}

Expand Down Expand Up @@ -296,13 +307,15 @@ static RemovalResult removeJobsAndDatafeeds(List<String> jobsToRemove, List<Stri
private void addJobIndexRequests(Collection<Job> jobs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS);
for (Job job : jobs) {
logger.debug("adding job to migrate: " + job.getId());
bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params));
}
}

private void addDatafeedIndexRequests(Collection<DatafeedConfig> datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) {
ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS);
for (DatafeedConfig datafeedConfig : datafeedConfigs) {
logger.debug("adding datafeed to migrate: " + datafeedConfig.getId());
bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params));
}
}
Expand All @@ -318,7 +331,6 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
return indexRequest;
}


// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {

Expand Down Expand Up @@ -361,6 +373,30 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
);
}

private void createConfigIndex(ActionListener<Boolean> listener) {
logger.info("creating the .ml-config index");
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName());
try
{
createIndexRequest.settings(
Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
);
createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping());
} catch (Exception e) {
logger.error("error writing the .ml-config mappings", e);
listener.onFailure(e);
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
r -> listener.onResponse(r.isAcknowledged()),
listener::onFailure
), client.admin().indices()::create);
}

public static Job updateJobForMigration(Job job) {
Job.Builder builder = new Job.Builder(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* This class implements CRUD operation for the
* datafeed configuration document
*
* The number of datafeeds returned in a search it limited to
* {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}.
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
*/
public class DatafeedConfigProvider {

private static final Logger logger = LogManager.getLogger(DatafeedConfigProvider.class);
Expand All @@ -88,13 +97,6 @@ public class DatafeedConfigProvider {
TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable);
}

/**
* In most cases we expect 10s or 100s of datafeeds to be defined and
* a search for all datafeeds should return all.
* TODO this is a temporary fix
*/
public int searchSize = 1000;

public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) {
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -433,7 +435,7 @@ private SearchRequest buildExpandDatafeedIdsSearch(String expression) {
return client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();
}

Expand All @@ -458,7 +460,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);
Expand Down Expand Up @@ -514,7 +516,7 @@ public void expandDatafeedConfigsWithoutMissingCheck(String expression, ActionLi
SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder)
.setSize(searchSize)
.setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)
.request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
Expand Down
Loading