diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java index f5bcd7d8d53db..47580bf731a44 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java @@ -43,12 +43,6 @@ public final class XPackRestTestHelper { private XPackRestTestHelper() { } - /** - * Waits for the Machine Learning templates to be created - * and check the version is up to date - */ - - /** * For each template name wait for the template to be created and * for the template version to be equal to the master node version. @@ -96,5 +90,4 @@ public static void waitForTemplates(RestClient client, List templateName }); } } - } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 4f88dd38bb439..ba1000135191e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -26,57 +25,39 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; -public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMasterListener { +public class MlAssignmentNotifier implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class); private final Auditor auditor; - private final ClusterService clusterService; private final MlConfigMigrator mlConfigMigrator; private final ThreadPool threadPool; - private final AtomicBoolean enabled = new AtomicBoolean(false); MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) { this.auditor = auditor; - this.clusterService = clusterService; this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService); this.threadPool = threadPool; - clusterService.addLocalNodeMasterListener(this); + clusterService.addListener(this); } MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) { this.auditor = auditor; - this.clusterService = clusterService; this.mlConfigMigrator = mlConfigMigrator; this.threadPool = threadPool; - clusterService.addLocalNodeMasterListener(this); + clusterService.addListener(this); } - @Override - public void onMaster() { - if (enabled.compareAndSet(false, true)) { - clusterService.addListener(this); - } - } - - @Override - public void offMaster() { - if (enabled.compareAndSet(true, false)) { - clusterService.removeListener(this); - } - } - - @Override - public String executorName() { + private String executorName() { return ThreadPool.Names.GENERIC; } @Override public void clusterChanged(ClusterChangedEvent event) { - if (enabled.get() == false) { + + if (event.localNodeMaster() == false) { return; } + if (event.metaDataChanged() == false) { return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 9b3b6a118659e..e17c23da0686e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -9,10 +9,14 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -31,12 +35,14 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -90,12 +96,14 @@ public class MlConfigMigrator { private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private final AtomicBoolean migrationInProgress; + private final AtomicBoolean firstTime; public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationInProgress = new AtomicBoolean(false); + this.firstTime = new AtomicBoolean(true); } /** @@ -127,9 +135,6 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener return; } - - logger.debug("migrating ml configurations"); - Collection stoppedDatafeeds = stoppedDatafeedConfigs(clusterState); Map eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream() .map(MlConfigMigrator::updateJobForMigration) @@ -148,19 +153,36 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); + if (firstTime.get()) { + snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( + response -> { + firstTime.set(false); + migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress); + }, + unMarkMigrationInProgress::onFailure + )); + return; + } + + migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress); + } + + private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener listener) { if (jobsAndDatafeedsToMigrate.totalCount() == 0) { - unMarkMigrationInProgress.onResponse(Boolean.FALSE); + listener.onResponse(Boolean.FALSE); return; } + logger.debug("migrating ml configurations"); + writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap( failedDocumentIds -> { List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs); List successfulDatafeedWrites = filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs); - removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, unMarkMigrationInProgress); + removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener); }, - unMarkMigrationInProgress::onFailure + listener::onFailure )); } @@ -300,6 +322,45 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To return indexRequest; } + + // public for testing + public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listener) { + + if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) { + listener.onResponse(Boolean.TRUE); + return; + } + + logger.debug("taking a snapshot of mlmetadata"); + String documentId = "ml-config"; + IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(), + ElasticsearchMappings.DOC_TYPE, documentId) + .setOpType(DocWriteRequest.OpType.CREATE); + + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + mlMetadata.toXContent(builder, params); + builder.endObject(); + + indexRequest.setSource(builder); + } catch (IOException e) { + logger.error("failed to serialise mlmetadata", e); + listener.onFailure(e); + return; + } + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(), + ActionListener.wrap( + indexResponse -> { + listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED); + }, + listener::onFailure), + client::index + ); + } + + public static Job updateJobForMigration(Job job) { Job.Builder builder = new Job.Builder(job); Map custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index b77ed582709ca..e43197eb06302 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -69,34 +69,39 @@ private void setupMocks() { public void testClusterChanged_info() { MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); - notifier.onMaster(); - DiscoveryNode node = - new DiscoveryNode("node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT); ClusterState previous = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) .build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id", "node_id", null, tasksBuilder); + addJobTask("job_id", "_node_id", null, tasksBuilder); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); - ClusterState state = ClusterState.builder(new ClusterName("_name")) + ClusterState newState = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) - .nodes(DiscoveryNodes.builder().add(node)) + // set local node master + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) .build(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).info(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); - notifier.offMaster(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + // no longer master + newState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))) + .build(); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verifyNoMoreInteractions(auditor); } public void testClusterChanged_warning() { MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); - notifier.onMaster(); ClusterState previous = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, @@ -106,21 +111,31 @@ public void testClusterChanged_warning() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, null, tasksBuilder); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); - ClusterState state = ClusterState.builder(new ClusterName("_name")) + ClusterState newState = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) + // set local node master + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) .build(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).warning(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); - notifier.offMaster(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + // no longer master + newState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) + .build(); + + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verifyNoMoreInteractions(auditor); } public void testClusterChanged_noPersistentTaskChanges() { MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); - notifier.onMaster(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, null, tasksBuilder); @@ -129,14 +144,25 @@ public void testClusterChanged_noPersistentTaskChanges() { .metaData(metaData) .build(); - ClusterState current = ClusterState.builder(new ClusterName("_name")) + ClusterState newState = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) + // set local node master + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) .build(); - notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous)); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); - notifier.offMaster(); + // no longer master + newState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) + .build(); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index b81805fb3fbdf..1dc06e0e2aef6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -13,9 +14,16 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; @@ -23,6 +31,8 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.junit.Before; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -98,7 +108,7 @@ public void testWriteConfigToIndex() throws InterruptedException { assertNull(alreadyMigratedJob.getCustomSettings()); } - public void testMigrateConfigs() throws InterruptedException { + public void testMigrateConfigs() throws InterruptedException, IOException { // and jobs and datafeeds clusterstate MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); @@ -124,11 +134,13 @@ public void testMigrateConfigs() throws InterruptedException { // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + // the first time this is called mlmetadata will be snap-shotted blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertTrue(responseHolder.get()); + assertSnapshot(mlMetadata.build()); // check the jobs have been migrated AtomicReference> jobsHolder = new AtomicReference<>(); @@ -171,9 +183,9 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) - .build(); + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference responseHolder = new AtomicReference<>(); @@ -181,7 +193,7 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService); blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), - responseHolder, exceptionHolder); + responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertFalse(responseHolder.get()); @@ -190,7 +202,7 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int AtomicReference> jobsHolder = new AtomicReference<>(); JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), - jobsHolder, exceptionHolder); + jobsHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(jobsHolder.get().isEmpty(), is(true)); @@ -198,11 +210,25 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry()); AtomicReference> datafeedsHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener), - datafeedsHolder, exceptionHolder); + datafeedsHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(datafeedsHolder.get().isEmpty(), is(true)); } + + public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException { + GetResponse getResponse = client() + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, "ml-config").get(); + + assertTrue(getResponse.isExists()); + + try (InputStream stream = getResponse.getSourceAsBytesRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + MlMetadata recoveredMeta = MlMetadata.LENIENT_PARSER.apply(parser, null).build(); + assertEquals(expectedMlMetadata, recoveredMeta); + } + } }