From 94402e42a32a3e694bcfc140aa4e7dd1281b256e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 13 Dec 2018 17:09:13 +0000 Subject: [PATCH 01/12] Create the ml-config index --- .../ml/MlConfigMigrationEligibilityCheck.java | 18 ++++++ .../xpack/ml/MlConfigMigrator.java | 55 +++++++++++++++---- ...lConfigMigrationEligibilityCheckTests.java | 18 ++++++ .../xpack/ml/MlConfigMigratorTests.java | 14 +++++ .../ml/integration/MlConfigMigratorIT.java | 46 +++++++++++++++- .../xpack/ml/job/JobManagerTests.java | 34 +++++++++++- 6 files changed, 169 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java index 0f127919ac3d0..72cb52424c3b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -7,6 +7,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -14,6 +15,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; /** * Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds) @@ -37,10 +39,12 @@ private void setConfigMigrationEnabled(boolean configMigrationEnabled) { this.isConfigMigrationEnabled = configMigrationEnabled; } + /** * Can migration start? Returns: * False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION} * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} + * False if the .ml-config index shards are not active * True otherwise * @param clusterState The cluster state * @return A boolean that dictates if config migration can start @@ -54,12 +58,26 @@ public boolean canStartMigration(ClusterState clusterState) { if (minNodeVersion.before(MIN_NODE_VERSION)) { return false; } + + return mlConfigIndexIsAllocated(clusterState); + } + + static boolean mlConfigIndexIsAllocated(ClusterState clusterState) { + if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + return false; + } + + IndexRoutingTable routingTable = clusterState.getRoutingTable().index(AnomalyDetectorsIndex.configIndexName()); + if (routingTable == null || routingTable.allPrimaryShardsActive() == false) { + return false; + } return true; } /** * Is the job a eligible for migration? Returns: * False if {@link #canStartMigration(ClusterState)} returns {@code false} + * False if the job is not in the cluster state * False if the {@link Job#isDeleting()} * False if the job has a persistent task * True otherwise i.e. the job is present, not deleting diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index c3b9626ffd042..30624efc587e4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -21,6 +23,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -29,6 +32,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -90,6 +94,7 @@ public class MlConfigMigrator { private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class); public static final String MIGRATED_FROM_VERSION = "migrated from version"; + static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000; static final int MAX_BULK_WRITE_SIZE = 100; @@ -99,7 +104,6 @@ public class MlConfigMigrator { private final AtomicBoolean migrationInProgress; private final AtomicBoolean tookConfigSnapshot; - public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); @@ -126,19 +130,11 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster * @param listener The success listener */ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { - - if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { - listener.onResponse(false); - return; - } - if (migrationInProgress.compareAndSet(false, true) == false) { listener.onResponse(Boolean.FALSE); return; } - logger.debug("migrating ml configurations"); - ActionListener unMarkMigrationInProgress = ActionListener.wrap( response -> { migrationInProgress.set(false); @@ -150,11 +146,21 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); + if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + logger.debug("creating the .ml-config index"); + createConfigIndex(unMarkMigrationInProgress); + return; + } + + if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { + unMarkMigrationInProgress.onResponse(false); + return; + } + snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( response -> { // We have successfully snapshotted the ML configs so we don't need to try again tookConfigSnapshot.set(true); - List batches = splitInBatches(clusterState); if (batches.isEmpty()) { unMarkMigrationInProgress.onResponse(Boolean.FALSE); @@ -162,7 +168,7 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } migrateBatches(batches, unMarkMigrationInProgress); }, - unMarkMigrationInProgress::onFailure + unMarkMigrationInProgress::onFailure )); } @@ -296,6 +302,7 @@ static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List jobs, BulkRequestBuilder bulkRequestBuilder) { ToXContent.Params params = new ToXContent.MapParams(JobConfigProvider.TO_XCONTENT_PARAMS); for (Job job : jobs) { + logger.debug("adding job to migrate: " + job.getId()); bulkRequestBuilder.add(indexRequest(job, Job.documentId(job.getId()), params)); } } @@ -303,6 +310,7 @@ private void addJobIndexRequests(Collection jobs, BulkRequestBuilder bulkRe private void addDatafeedIndexRequests(Collection datafeedConfigs, BulkRequestBuilder bulkRequestBuilder) { ToXContent.Params params = new ToXContent.MapParams(DatafeedConfigProvider.TO_XCONTENT_PARAMS); for (DatafeedConfig datafeedConfig : datafeedConfigs) { + logger.debug("adding datafeed to migrate: " + datafeedConfig.getId()); bulkRequestBuilder.add(indexRequest(datafeedConfig, DatafeedConfig.documentId(datafeedConfig.getId()), params)); } } @@ -318,7 +326,6 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To return indexRequest; } - // public for testing public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listener) { @@ -361,6 +368,30 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen ); } + private void createConfigIndex(ActionListener listener) { + logger.info("creating the ml config index"); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName()); + try + { + createIndexRequest.settings( + Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), CONFIG_INDEX_MAX_RESULTS_WINDOW) + ); + createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping()); + } catch (Exception e) { + logger.error("error writing the .ml-config mappings", e); + listener.onFailure(e); + return; + } + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, createIndexRequest, + ActionListener.wrap( + r -> listener.onResponse(r.isAcknowledged()), + listener::onFailure + ), client.admin().indices()::create); + } public static Job updateJobForMigration(Job job) { Job.Builder builder = new Job.Builder(job); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java index fec071c464104..0be3a7c99fd47 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -84,6 +85,23 @@ public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled( assertTrue(check.canStartMigration(clusterState)); } + public void testCanStartMigration_givenMissingIndex() { + Settings settings = newSettings(true); + givenClusterSettings(settings); + + // index is present but no routing + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(metaData) + .build(); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + assertFalse(check.canStartMigration(clusterState)); + } + + public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java index d9ea035e58234..637c26f071084 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -10,7 +10,20 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -20,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import java.util.ArrayList; import java.util.Arrays; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index d98abea55535c..2cef64fb87b3d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -5,13 +5,25 @@ */ package org.elasticsearch.xpack.ml.integration; +<<<<<<< HEAD import org.elasticsearch.action.get.GetResponse; +======= +import org.elasticsearch.Version; +>>>>>>> Create the ml-config index import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; +<<<<<<< HEAD import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -25,6 +37,15 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; +======= +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +>>>>>>> Create the ml-config index import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -117,9 +138,12 @@ public void testMigrateConfigs() throws InterruptedException, IOException { builder.setIndices(Collections.singletonList("beats*")); mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) .build(); doAnswer(invocation -> { @@ -304,6 +328,24 @@ public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException { assertEquals(expectedMlMetadata, recoveredMeta); } } + + private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 871affade508d..a2962faf945e9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -8,13 +8,21 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.document.DocumentField; @@ -25,7 +33,9 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -850,9 +860,29 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { public void testUpdateJob_notAllowedPreMigration() { MlMetadata.Builder mlmetadata = new MlMetadata.Builder().putJob(buildJobBuilder("closed-job-not-migrated").build(), false); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlmetadata.build())) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlmetadata.build())) + .routingTable(routingTable.build()) .build(); when(clusterService.state()).thenReturn(clusterState); From 438079e68e3ce9eb82d8a4c57da518d7ce018de7 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Sun, 16 Dec 2018 10:52:18 +0000 Subject: [PATCH 02/12] Set results window size in template --- .../main/java/org/elasticsearch/xpack/ml/MachineLearning.java | 3 ++- .../main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index ba5a40bfbe44b..bf48b209f9b38 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -675,7 +675,8 @@ public UnaryOperator> getIndexTemplateMetaDat // least possible burden on Elasticsearch .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting)) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW)) .version(Version.CURRENT.id) .putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping)) .build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 30624efc587e4..79b09d8775a20 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -94,7 +94,7 @@ public class MlConfigMigrator { private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class); public static final String MIGRATED_FROM_VERSION = "migrated from version"; - static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000; + public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000; static final int MAX_BULK_WRITE_SIZE = 100; From b74af10c23622d4e0184f2ce352f38b681ed0bd3 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 15:16:49 +0000 Subject: [PATCH 03/12] Adjust after rebase --- ...lConfigMigrationEligibilityCheckTests.java | 88 +++++++++++++++---- .../xpack/ml/MlConfigMigratorTests.java | 14 --- .../ml/integration/MlConfigMigratorIT.java | 17 +--- 3 files changed, 75 insertions(+), 44 deletions(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java index 0be3a7c99fd47..4785f9f75a5c3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheckTests.java @@ -8,14 +8,22 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -25,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.junit.Before; import java.net.InetAddress; @@ -54,12 +63,18 @@ public void testCanStartMigration_givenMigrationIsDisabled() { } public void testCanStartMigration_givenNodesNotUpToVersion() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .routingTable(routingTable.build()) + .metaData(metaData) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -70,12 +85,18 @@ public void testCanStartMigration_givenNodesNotUpToVersion() { } public void testCanStartMigration_givenNodesNotUpToVersionAndMigrationIsEnabled() { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + // mixed 6.5 and 6.6 nodes ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .routingTable(routingTable.build()) + .metaData(metaData) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -89,6 +110,17 @@ public void testCanStartMigration_givenMissingIndex() { Settings settings = newSettings(true); givenClusterSettings(settings); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .build(); + + MlConfigMigrationEligibilityCheck check = new MlConfigMigrationEligibilityCheck(settings, clusterService); + assertFalse(check.canStartMigration(clusterState)); + } + + public void testCanStartMigration_givenInactiveShards() { + Settings settings = newSettings(true); + givenClusterSettings(settings); + // index is present but no routing MetaData.Builder metaData = MetaData.builder(); RoutingTable.Builder routingTable = RoutingTable.builder(); @@ -101,6 +133,24 @@ public void testCanStartMigration_givenMissingIndex() { assertFalse(check.canStartMigration(clusterState)); } + private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder routingTable) { + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(AnomalyDetectorsIndex.configIndexName()); + indexMetaData.settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ); + metaData.put(indexMetaData); + Index index = new Index(AnomalyDetectorsIndex.configIndexName(), "_uuid"); + ShardId shardId = new ShardId(index, 0); + ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, true, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + shardRouting = shardRouting.initialize("node_id", null, 0L); + shardRouting = shardRouting.moveToStarted(); + routingTable.add(IndexRoutingTable.builder(index) + .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); + } + public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { // mixed 6.5 and 6.6 nodes @@ -203,11 +253,14 @@ public void testJobIsEligibleForMigration_givenClosedJob() { Job closedJob = JobTests.buildJobBuilder("closed-job").build(); MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); @@ -301,11 +354,14 @@ public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); String datafeedId = "df-" + job.getId(); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); Settings settings = newSettings(true); givenClusterSettings(settings); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java index 637c26f071084..d9ea035e58234 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -10,20 +10,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RecoverySource; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -33,7 +20,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import java.util.ArrayList; import java.util.Arrays; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 2cef64fb87b3d..84f860cd12171 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -5,11 +5,8 @@ */ package org.elasticsearch.xpack.ml.integration; -<<<<<<< HEAD -import org.elasticsearch.action.get.GetResponse; -======= import org.elasticsearch.Version; ->>>>>>> Create the ml-config index +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -23,7 +20,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; -<<<<<<< HEAD import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -31,21 +27,14 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; -import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; -======= -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; ->>>>>>> Create the ml-config index +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; From f4a300ea6ceccae97a35383fbef946bd2c184aac Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 15:29:28 +0000 Subject: [PATCH 04/12] create once --- .../elasticsearch/xpack/ml/MlConfigMigrator.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 79b09d8775a20..62b524ebd2dfd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -104,12 +104,15 @@ public class MlConfigMigrator { private final AtomicBoolean migrationInProgress; private final AtomicBoolean tookConfigSnapshot; + private final AtomicBoolean configIndexCreated; + public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationInProgress = new AtomicBoolean(false); this.tookConfigSnapshot = new AtomicBoolean(false); + this.configIndexCreated = new AtomicBoolean(false); } /** @@ -146,9 +149,16 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); - if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { - logger.debug("creating the .ml-config index"); - createConfigIndex(unMarkMigrationInProgress); + if (configIndexCreated.get() == false && + clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + logger.info("creating the .ml-config index"); + createConfigIndex(ActionListener.wrap( + response -> { + configIndexCreated.set(true); + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + }, + unMarkMigrationInProgress::onFailure + )); return; } From 172981dd6f9d0f635b1cdaea11bceb970332fa3b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 16:37:28 +0000 Subject: [PATCH 05/12] Create the index only if there are configs to migrate --- .../main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 62b524ebd2dfd..14edf85b7ecb6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -151,7 +151,6 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener if (configIndexCreated.get() == false && clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { - logger.info("creating the .ml-config index"); createConfigIndex(ActionListener.wrap( response -> { configIndexCreated.set(true); @@ -379,7 +378,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen } private void createConfigIndex(ActionListener listener) { - logger.info("creating the ml config index"); + logger.info("creating the .ml-config index"); CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.configIndexName()); try { From 57cabfbd6ec7ea9d35067f82e51bcd6812d6d458 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 16:40:39 +0000 Subject: [PATCH 06/12] Update on all changes --- .../org/elasticsearch/xpack/ml/MlAssignmentNotifier.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index ba1000135191e..99cdb0fd26c4c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -57,10 +57,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster() == false) { return; } - - if (event.metaDataChanged() == false) { - return; - } + PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); From a3e9a6d6521d9f3957011ceae5f29ebfd7261d1e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 17:50:34 +0000 Subject: [PATCH 07/12] Tidy up assignment notifier --- .../xpack/ml/MlAssignmentNotifier.java | 22 ++++++++++--------- .../xpack/ml/MlAssignmentNotifierTests.java | 6 ++--- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 99cdb0fd26c4c..9db17ed448433 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -57,21 +56,24 @@ public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster() == false) { return; } - - PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( - response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())), + response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)), e -> { logger.error("error migrating ml configurations", e); - threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); + threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)); } )); } - private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous, - ClusterState state) { + private void auditChangesToMlTasks(ClusterChangedEvent event) { + + if (event.metaDataChanged() == false) { + return; + } + + PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (Objects.equals(previous, current)) { return; @@ -89,7 +91,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis if (currentAssignment.getExecutorNode() == null) { auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); } else { - DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { @@ -103,7 +105,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis auditor.warning(jobId, msg); } } else { - DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); if (jobId != null) { auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index e43197eb06302..5c8c253794794 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -31,7 +31,6 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -154,7 +153,8 @@ public void testClusterChanged_noPersistentTaskChanges() { .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any()); + verifyNoMoreInteractions(auditor); // no longer master newState = ClusterState.builder(new ClusterName("_name")) @@ -163,6 +163,6 @@ public void testClusterChanged_noPersistentTaskChanges() { .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) .build(); notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); - verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + verifyNoMoreInteractions(configMigrator); } } From f3c70b122f3c3219ab4c76ccf0598cc5e56ca7ec Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 17:59:37 +0000 Subject: [PATCH 08/12] Use the config index max results window size in searches --- .../persistence/DatafeedConfigProvider.java | 23 +++++++++++-------- .../ml/job/persistence/JobConfigProvider.java | 23 +++++++++---------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 15432f8a0ee3f..eb4b80c185a2b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -53,6 +53,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import java.io.IOException; @@ -73,6 +74,15 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; +/** + * This class implements CRUD operation for the + * datafeed configuration document + * + * The number of datafeeds returned in a search it limited to + * {@link MlConfigMigrator#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * In most cases we expect 10s or 100s of datafeeds to be defined and + * a search for all datafeeds should return all. + */ public class DatafeedConfigProvider { private static final Logger logger = LogManager.getLogger(DatafeedConfigProvider.class); @@ -88,13 +98,6 @@ public class DatafeedConfigProvider { TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); } - /** - * In most cases we expect 10s or 100s of datafeeds to be defined and - * a search for all datafeeds should return all. - * TODO this is a temporary fix - */ - public int searchSize = 1000; - public DatafeedConfigProvider(Client client, NamedXContentRegistry xContentRegistry) { this.client = client; this.xContentRegistry = xContentRegistry; @@ -433,7 +436,7 @@ private SearchRequest buildExpandDatafeedIdsSearch(String expression) { return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); } @@ -458,7 +461,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); @@ -514,7 +517,7 @@ public void expandDatafeedConfigsWithoutMissingCheck(String expression, ActionLi SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 9fae5da178f37..587bc3941850b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -68,6 +68,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import java.io.IOException; import java.io.InputStream; @@ -89,6 +90,11 @@ /** * This class implements CRUD operation for the * anomaly detector job configuration document + * + * The number of jobs returned in a search it limited to + * {@link MlConfigMigrator#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * In most cases we expect 10s or 100s of jobs to be defined and + * a search for all jobs should return all. */ public class JobConfigProvider { @@ -101,13 +107,6 @@ public class JobConfigProvider { TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); } - /** - * In most cases we expect 10s or 100s of jobs to be defined and - * a search for all jobs should return all. - * TODO this is a temporary fix - */ - private int searchSize = 1000; - private final Client client; public JobConfigProvider(Client client) { @@ -669,7 +668,7 @@ private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excl return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); } @@ -695,7 +694,7 @@ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDe SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); @@ -754,7 +753,7 @@ public void expandJobsWithoutMissingcheck(String expression, boolean excludeDele SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, @@ -800,7 +799,7 @@ public void expandGroupIds(List groupIds, ActionListener> listener) { SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(searchSize) + .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, From ade9b2de26a990a51dd49524c12e8db44fbddf7e Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 18:27:48 +0000 Subject: [PATCH 09/12] Remove unnecessary atomicboolean --- .../main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 14edf85b7ecb6..19224b002e34f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -151,9 +151,9 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener if (configIndexCreated.get() == false && clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + createConfigIndex(ActionListener.wrap( response -> { - configIndexCreated.set(true); unMarkMigrationInProgress.onResponse(Boolean.FALSE); }, unMarkMigrationInProgress::onFailure From 3d2b0d60f067778a3cdb0912e574dc043669ccb6 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 17 Dec 2018 18:39:56 +0000 Subject: [PATCH 10/12] Add test for index creation --- .../ml/integration/MlConfigMigratorIT.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 84f860cd12171..23dd366e8cf04 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -335,6 +335,33 @@ private void addMlConfigIndex(MetaData.Builder metaData, RoutingTable.Builder ro routingTable.add(IndexRoutingTable.builder(index) .addIndexShard(new IndexShardRoutingTable.Builder(shardId).addShard(shardRouting).build())); } + + public void testConfigIndexIsCreated() throws Exception { + // and jobs and datafeeds clusterstate + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + + // if the cluster state has a job config and the index does not + // exist it should be created + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertBusy(() -> { + assertTrue(configIndexExists()); + }); + } + + private boolean configIndexExists() { + return client().admin().indices().prepareExists(AnomalyDetectorsIndex.configIndexName()).get().isExists(); + } } From 3f607d74d8063dcd837ba48f1ca75b24128ddfa6 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 18 Dec 2018 09:26:42 +0000 Subject: [PATCH 11/12] Adjust to rebase --- .../xpack/ml/MlConfigMigrator.java | 27 +++++++++---------- .../ml/integration/MlConfigMigratorIT.java | 13 ++++----- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 19224b002e34f..e3802c52a5f2e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -104,7 +104,6 @@ public class MlConfigMigrator { private final AtomicBoolean migrationInProgress; private final AtomicBoolean tookConfigSnapshot; - private final AtomicBoolean configIndexCreated; public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { this.client = Objects.requireNonNull(client); @@ -112,7 +111,6 @@ public MlConfigMigrator(Settings settings, Client client, ClusterService cluster this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationInProgress = new AtomicBoolean(false); this.tookConfigSnapshot = new AtomicBoolean(false); - this.configIndexCreated = new AtomicBoolean(false); } /** @@ -149,9 +147,13 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); - if (configIndexCreated.get() == false && - clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { + List batches = splitInBatches(clusterState); + if (batches.isEmpty()) { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + return; + } + if (clusterState.metaData().hasIndex(AnomalyDetectorsIndex.configIndexName()) == false) { createConfigIndex(ActionListener.wrap( response -> { unMarkMigrationInProgress.onResponse(Boolean.FALSE); @@ -162,21 +164,16 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { - unMarkMigrationInProgress.onResponse(false); + unMarkMigrationInProgress.onResponse(Boolean.FALSE); return; } snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( - response -> { - // We have successfully snapshotted the ML configs so we don't need to try again - tookConfigSnapshot.set(true); - List batches = splitInBatches(clusterState); - if (batches.isEmpty()) { - unMarkMigrationInProgress.onResponse(Boolean.FALSE); - return; - } - migrateBatches(batches, unMarkMigrationInProgress); - }, + response -> { + // We have successfully snapshotted the ML configs so we don't need to try again + tookConfigSnapshot.set(true); + migrateBatches(batches, unMarkMigrationInProgress); + }, unMarkMigrationInProgress::onFailure )); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 23dd366e8cf04..87c0e4ac824ce 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -193,10 +193,13 @@ public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws Inter mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); } + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) - .build(); + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; @@ -354,9 +357,7 @@ public void testConfigIndexIsCreated() throws Exception { blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), responseHolder, exceptionHolder); - assertBusy(() -> { - assertTrue(configIndexExists()); - }); + assertBusy(() -> assertTrue(configIndexExists())); } private boolean configIndexExists() { From b5c424343894c47a85e47eb1906ce49b5bb49820 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 18 Dec 2018 13:06:43 +0000 Subject: [PATCH 12/12] Move setting --- .../ml/job/persistence/AnomalyDetectorsIndex.java | 2 ++ .../org/elasticsearch/xpack/ml/MachineLearning.java | 3 ++- .../elasticsearch/xpack/ml/MlConfigMigrator.java | 3 +-- .../persistence/DatafeedConfigProvider.java | 9 ++++----- .../xpack/ml/job/persistence/JobConfigProvider.java | 13 ++++++------- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index b7b104e35cdec..673e796ef7e1f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -10,6 +10,8 @@ */ public final class AnomalyDetectorsIndex { + public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000; + private AnomalyDetectorsIndex() { } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index bf48b209f9b38..039bfd175e9f6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -676,7 +676,8 @@ public UnaryOperator> getIndexTemplateMetaDat .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delayedNodeTimeOutSetting) - .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW)) + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), + AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW)) .version(Version.CURRENT.id) .putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(configMapping)) .build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index e3802c52a5f2e..f2fe80f377649 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -94,7 +94,6 @@ public class MlConfigMigrator { private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class); public static final String MIGRATED_FROM_VERSION = "migrated from version"; - public static final int CONFIG_INDEX_MAX_RESULTS_WINDOW = 10_000; static final int MAX_BULK_WRITE_SIZE = 100; @@ -383,7 +382,7 @@ private void createConfigIndex(ActionListener listener) { Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") - .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), CONFIG_INDEX_MAX_RESULTS_WINDOW) + .put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) ); createIndexRequest.mapping(ElasticsearchMappings.DOC_TYPE, ElasticsearchMappings.configMapping()); } catch (Exception e) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index eb4b80c185a2b..2e620204e228a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -53,7 +53,6 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; -import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import java.io.IOException; @@ -79,7 +78,7 @@ * datafeed configuration document * * The number of datafeeds returned in a search it limited to - * {@link MlConfigMigrator#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}. * In most cases we expect 10s or 100s of datafeeds to be defined and * a search for all datafeeds should return all. */ @@ -436,7 +435,7 @@ private SearchRequest buildExpandDatafeedIdsSearch(String expression) { return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); } @@ -461,7 +460,7 @@ public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, A SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds); @@ -517,7 +516,7 @@ public void expandDatafeedConfigsWithoutMissingCheck(String expression, ActionLi SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 587bc3941850b..5fbc58d730528 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -68,7 +68,6 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; -import org.elasticsearch.xpack.ml.MlConfigMigrator; import java.io.IOException; import java.io.InputStream; @@ -92,7 +91,7 @@ * anomaly detector job configuration document * * The number of jobs returned in a search it limited to - * {@link MlConfigMigrator#CONFIG_INDEX_MAX_RESULTS_WINDOW}. + * {@link AnomalyDetectorsIndex#CONFIG_INDEX_MAX_RESULTS_WINDOW}. * In most cases we expect 10s or 100s of jobs to be defined and * a search for all jobs should return all. */ @@ -668,7 +667,7 @@ private SearchRequest makeExpandIdsSearchRequest(String expression, boolean excl return client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); } @@ -694,7 +693,7 @@ public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDe SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoJobs); @@ -753,7 +752,7 @@ public void expandJobsWithoutMissingcheck(String expression, boolean excludeDele SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, @@ -799,7 +798,7 @@ public void expandGroupIds(List groupIds, ActionListener> listener) { SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setSource(sourceBuilder) - .setSize(MlConfigMigrator.CONFIG_INDEX_MAX_RESULTS_WINDOW) + .setSize(AnomalyDetectorsIndex.CONFIG_INDEX_MAX_RESULTS_WINDOW) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,