diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java index 83bbe79a7b470..47c0d4f64f96f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java @@ -58,6 +58,11 @@ public static ElasticsearchStatusException badRequestException(String msg, Objec return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args); } + public static ElasticsearchStatusException configHasNotBeenMigrated(String verb, String id) { + return new ElasticsearchStatusException("cannot {} as the configuration [{}] is temporarily pending migration", + RestStatus.SERVICE_UNAVAILABLE, verb, id); + } + /** * Creates an error message that explains there are shard failures, displays info * for the first failure (shard/reason) and kindly asks to see more info in the logs diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 9479d1281d6de..355ad19c8fc98 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -7,7 +7,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -82,29 +81,23 @@ public void clusterChanged(ClusterChangedEvent event) { } PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (Objects.equals(previous, current)) { - return; - } - - Version minNodeVersion = event.state().nodes().getMinNodeVersion(); - if (minNodeVersion.onOrAfter(Version.V_6_6_0)) { - // ok to migrate - mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( - response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())), - e -> { - logger.error("error migrating ml configurations", e); - threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); - } - )); - } else { - threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); - } + mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( + response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())), + e -> { + logger.error("error migrating ml configurations", e); + threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); + } + )); } private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous, ClusterState state) { + if (Objects.equals(previous, current)) { + return; + } + for (PersistentTask currentTask : current.tasks()) { Assignment currentAssignment = currentTask.getAssignment(); PersistentTask previousTask = previous != null ? previous.getTask(currentTask.getId()) : null; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 22dc43b9326fb..ea7bb356f1581 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -75,6 +75,7 @@ public class MlConfigMigrator { private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class); public static final String MIGRATED_FROM_VERSION = "migrated from version"; + public static final Version MIN_NODE_VERSION = Version.V_6_6_0; private final Client client; private final ClusterService clusterService; @@ -106,11 +107,19 @@ public MlConfigMigrator(Client client, ClusterService clusterService) { */ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { + Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); + if (minNodeVersion.before(MIN_NODE_VERSION)) { + listener.onResponse(Boolean.FALSE); + return; + } + if (migrationInProgress.compareAndSet(false, true) == false) { listener.onResponse(Boolean.FALSE); return; } + logger.debug("migrating ml configurations"); + Collection datafeedsToMigrate = stoppedDatafeedConfigs(clusterState); List jobsToMigrate = nonDeletingJobs(closedJobConfigs(clusterState)).stream() .map(MlConfigMigrator::updateJobForMigration) @@ -381,4 +390,60 @@ static List filterFailedDatafeedConfigWrites(Set failedDocumentI .filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false) .collect(Collectors.toList()); } + + /** + * Is the job a eligible for migration? Returns: + * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} + * False if the job is not in the cluster state + * False if the {@link Job#isDeleting()} + * False if the job has a persistent task + * True otherwise i.e. the job is present, not deleting + * and does not have a persistent task. + * + * @param jobId The job Id + * @param clusterState clusterstate + * @return A boolean depending on the conditions listed above + */ + public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) { + Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); + if (minNodeVersion.before(MIN_NODE_VERSION)) { + return false; + } + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + Job job = mlMetadata.getJobs().get(jobId); + + if (job == null || job.isDeleting()) { + return false; + } + + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + return MlTasks.openJobIds(persistentTasks).contains(jobId) == false; + } + + /** + * Is the datafeed a eligible for migration? Returns: + * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} + * False if the datafeed is not in the cluster state + * False if the datafeed has a persistent task + * True otherwise i.e. the datafeed is present and does not have a persistent task. + * + * @param datafeedId The datafeed Id + * @param clusterState clusterstate + * @return A boolean depending on the conditions listed above + */ + public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) { + Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); + if (minNodeVersion.before(MIN_NODE_VERSION)) { + return false; + } + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) { + return false; + } + + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false; + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index a7dbb9d4f93b6..58e79c1ab11dc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -72,6 +73,12 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state, ActionListener listener) { + + if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) { + listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId())); + return; + } + if (request.isForce()) { forceDeleteDatafeed(request, state, listener); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 9eb526e529a37..5dc9d04d6853b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -64,6 +64,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; @@ -147,6 +148,12 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta @Override protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state, ActionListener listener) { + + if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) { + listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId())); + return; + } + logger.debug("Deleting job '{}'", request.getJobId()); if (request.isForce() == false) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 23c45c4c69c27..363c016360be2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -70,6 +70,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -528,6 +529,11 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste @Override protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener listener) { + if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) { + listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId())); + return; + } + OpenJobAction.JobParams jobParams = request.getJobParams(); if (licenseState.isMachineLearningAllowed()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index c5096c966a405..1522c04d8f977 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -68,6 +69,11 @@ protected RevertModelSnapshotAction.Response newResponse() { @Override protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state, ActionListener listener) { + if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) { + listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", request.getJobId())); + return; + } + logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}", request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index f76da8c1c0a1d..55270691661bd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; @@ -139,6 +140,11 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState return; } + if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getParams().getDatafeedId(), state)) { + listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("start datafeed", request.getParams().getDatafeedId())); + return; + } + AtomicReference datafeedConfigHolder = new AtomicReference<>(); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 8ae6afdb0578c..cd918b7666059 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -69,6 +70,11 @@ protected PutDatafeedAction.Response newResponse() { protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state, ActionListener listener) throws Exception { + if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) { + listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId())); + return; + } + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null; if (datafeedConfigIsInClusterState) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 47d26f0b887fb..38cdbffc299f2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; @@ -436,7 +437,13 @@ public void onFailure(Exception e) { } public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); + ClusterState clusterState = clusterService.state(); + if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), clusterState)) { + actionListener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update job", request.getJobId())); + return; + } + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) { @@ -466,6 +473,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener actionListener) { + if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) { updateJobClusterState(request, actionListener); } else { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index f6754ff7d039d..b77ed582709ca 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -139,38 +139,4 @@ public void testClusterChanged_noPersistentTaskChanges() { notifier.offMaster(); verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); } - - public void testMigrateNotTriggered_GivenPre66Nodes() { - MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); - notifier.onMaster(); - - ClusterState previous = ClusterState.builder(new ClusterName("_name")) - .build(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id", null, null, tasksBuilder); - MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); - - // mixed 6.5 and 6.6 nodes - ClusterState current = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .metaData(metaData) - .build(); - - notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous)); - verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); - - current = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .metaData(metaData) - .build(); - - // all 6.6 nodes - notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous)); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any()); - } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java index faaf4425dfb02..435610acab18d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -11,6 +11,9 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -21,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -211,6 +215,123 @@ public void testRemoveJobsAndDatafeeds_removeSome() { assertThat(removalResult.removedDatafeedIds, empty()); } + public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { + // mixed 6.5 and 6.6 nodes + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .build(); + + assertFalse(MlConfigMigrator.jobIsEligibleForMigration("pre-min-version", clusterState)); + } + + public void testJobIsEligibleForMigration_givenJobNotInClusterState() { + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); + assertFalse(MlConfigMigrator.jobIsEligibleForMigration("not-in-state", clusterState)); + } + + public void testJobIsEligibleForMigration_givenDeletingJob() { + Job deletingJob = JobTests.buildJobBuilder("deleting-job").setDeleting(true).build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(deletingJob, false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(deletingJob.getId()), + MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(deletingJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + assertFalse(MlConfigMigrator.jobIsEligibleForMigration(deletingJob.getId(), clusterState)); + } + + public void testJobIsEligibleForMigration_givenOpenJob() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + assertFalse(MlConfigMigrator.jobIsEligibleForMigration(openJob.getId(), clusterState)); + } + + public void testJobIsEligibleForMigration_givenClosedJob() { + Job closedJob = JobTests.buildJobBuilder("closed-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + ) + .build(); + + assertTrue(MlConfigMigrator.jobIsEligibleForMigration(closedJob.getId(), clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenNodesNotUpToVersion() { + // mixed 6.5 and 6.6 nodes + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) + .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) + .build(); + + assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration("pre-min-version", clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenDatafeedNotInClusterState() { + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); + assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration("not-in-state", clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenStartedDatafeed() { + Job openJob = JobTests.buildJobBuilder("open-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); + mlMetadata.putDatafeed(createCompatibleDatafeed(openJob.getId()), Collections.emptyMap()); + String datafeedId = "df-" + openJob.getId(); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, + new StartDatafeedAction.DatafeedParams(datafeedId, 0L), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) + ) + .build(); + + assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration(datafeedId, clusterState)); + } + + public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { + Job job = JobTests.buildJobBuilder("closed-job").build(); + MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(job, false); + mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); + String datafeedId = "df-" + job.getId(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build()) + ) + .build(); + + assertTrue(MlConfigMigrator.datafeedIsEligibleForMigration(datafeedId, clusterState)); + } private DatafeedConfig createCompatibleDatafeed(String jobId) { // create a datafeed without aggregations or anything diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 484008aa2b6a8..73a048d3fe3c0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -32,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; @@ -854,6 +856,24 @@ public void testNotifyFilterChangedGivenOnlyRemovedItems() throws IOException { Mockito.verifyNoMoreInteractions(auditor, updateJobProcessNotifier); } + public void testUpdateJob_notAllowedPreMigration() { + MlMetadata.Builder mlmetadata = new MlMetadata.Builder().putJob(buildJobBuilder("closed-job-not-migrated").build(), false); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlmetadata.build())) + .build(); + when(clusterService.state()).thenReturn(clusterState); + + JobManager jobManager = createJobManager(new MockClientBuilder("jobmanager-test").build()); + jobManager.updateJob(new UpdateJobAction.Request("closed-job-not-migrated", null), ActionListener.wrap( + response -> fail("response not expected: " + response), + exception -> { + assertThat(exception, instanceOf(ElasticsearchStatusException.class)); + } + )); + + } + public void testUpdateProcessOnCalendarChanged() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job-1", "node_id", JobState.OPENED, tasksBuilder); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java index 3b72674ed0751..2eb10de4e6832 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.upgrades; +import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -68,6 +69,14 @@ public static ClusterType parse(String value) { } protected static final ClusterType CLUSTER_TYPE = ClusterType.parse(System.getProperty("tests.rest.suite")); + protected static final Version UPGRADED_FROM_VERSION; + static { + String versionProperty = System.getProperty("tests.upgrade_from_version"); + if (versionProperty == null) { + throw new IllegalStateException("System property 'tests.upgrade_from_version' not set, cannot start tests"); + } + UPGRADED_FROM_VERSION = Version.fromString(versionProperty); + } @Override protected Settings restClientSettings() { diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java new file mode 100644 index 0000000000000..9c0d39f1b0aa9 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java @@ -0,0 +1,576 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isEmptyOrNullString; +import static org.hamcrest.Matchers.not; + +public class MlMigrationIT extends AbstractUpgradeTestCase { + + private static final String OLD_CLUSTER_OPEN_JOB_ID = "ml-migration-it-old-cluster-open-job"; + private static final String OLD_CLUSTER_STARTED_DATAFEED_ID = "ml-migration-it-old-cluster-started-datafeed"; + private static final String OLD_CLUSTER_CLOSED_JOB_ID = "ml-migration-it-old-cluster-closed-job"; + private static final String OLD_CLUSTER_STOPPED_DATAFEED_ID = "ml-migration-it-old-cluster-stopped-datafeed"; + private static final String OLD_CLUSTER_CLOSED_JOB_EXTRA_ID = "ml-migration-it-old-cluster-closed-job-extra"; + private static final String OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID = "ml-migration-it-old-cluster-stopped-datafeed-extra"; + + @Override + protected Collection templatesToWaitFor() { + List templatesToWaitFor = XPackRestTestHelper.ML_POST_V660_TEMPLATES; + + // If upgrading from a version prior to v6.6.0 the set of templates + // to wait for is different + if (CLUSTER_TYPE == ClusterType.OLD) { + if (UPGRADED_FROM_VERSION.before(Version.V_6_6_0)) { + templatesToWaitFor = XPackRestTestHelper.ML_PRE_V660_TEMPLATES; + } + } + + return templatesToWaitFor; + } + + private void waitForClusterHealth() throws IOException { + switch (CLUSTER_TYPE) { + case OLD: + case MIXED: + Request waitForYellow = new Request("GET", "/_cluster/health"); + waitForYellow.addParameter("wait_for_nodes", "3"); + waitForYellow.addParameter("wait_for_status", "yellow"); + client().performRequest(waitForYellow); + break; + case UPGRADED: + Request waitForGreen = new Request("GET", "/_cluster/health"); + waitForGreen.addParameter("wait_for_nodes", "3"); + waitForGreen.addParameter("wait_for_status", "green"); + // wait for long enough that we give delayed unassigned shards to stop being delayed + waitForGreen.addParameter("timeout", "70s"); + waitForGreen.addParameter("level", "shards"); + client().performRequest(waitForGreen); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void createTestIndex() throws IOException { + Request createTestIndex = new Request("PUT", "/airline-responsetime-data"); + createTestIndex.setJsonEntity("{\"mappings\": { \"doc\": {\"properties\": {" + + "\"time\": {\"type\": \"date\"}," + + "\"airline\": {\"type\": \"keyword\"}," + + "\"responsetime\": {\"type\": \"float\"}" + + "}}}}"); + client().performRequest(createTestIndex); + } + + public void testConfigMigration() throws Exception { + if (UPGRADED_FROM_VERSION.onOrAfter(Version.V_6_6_0)) { + // We are testing migration of ml config defined in the clusterstate + // in versions before V6.6.0. There is no point testing later versions + // as the config will be written to index documents + logger.info("Testing migration of ml config in version [" + UPGRADED_FROM_VERSION + "] is a no-op"); + return; + } + + waitForClusterHealth(); + + switch (CLUSTER_TYPE) { + case OLD: + createTestIndex(); + oldClusterTests(); + break; + case MIXED: + mixedClusterTests(); + break; + case UPGRADED: + upgradedClusterTests(); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void oldClusterTests() throws IOException { + // create jobs and datafeeds + Detector.Builder d = new Detector.Builder("metric", "responsetime"); + d.setByFieldName("airline"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10)); + Job.Builder openJob = new Job.Builder(OLD_CLUSTER_OPEN_JOB_ID); + openJob.setAnalysisConfig(analysisConfig); + openJob.setDataDescription(new DataDescription.Builder()); + + Request putOpenJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); + putOpenJob.setJsonEntity(Strings.toString(openJob)); + client().performRequest(putOpenJob); + + Request openOpenJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_open"); + client().performRequest(openOpenJob); + + DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID); + if (UPGRADED_FROM_VERSION.before(Version.V_6_6_0)) { + dfBuilder.setDelayedDataCheckConfig(null); + } + dfBuilder.setIndices(Collections.singletonList("airline-responsetime-data")); + dfBuilder.setTypes(Collections.singletonList("doc")); + + Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); + putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build())); + client().performRequest(putDatafeed); + + Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_start"); + client().performRequest(startDatafeed); + + Job.Builder closedJob = new Job.Builder(OLD_CLUSTER_CLOSED_JOB_ID); + closedJob.setAnalysisConfig(analysisConfig); + closedJob.setDataDescription(new DataDescription.Builder()); + + Request putClosedJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID); + putClosedJob.setJsonEntity(Strings.toString(closedJob)); + client().performRequest(putClosedJob); + + DatafeedConfig.Builder stoppedDfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_CLOSED_JOB_ID); + if (UPGRADED_FROM_VERSION.before(Version.V_6_6_0)) { + stoppedDfBuilder.setDelayedDataCheckConfig(null); + } + stoppedDfBuilder.setIndices(Collections.singletonList("airline-responsetime-data")); + + Request putStoppedDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_ID); + putStoppedDatafeed.setJsonEntity(Strings.toString(stoppedDfBuilder.build())); + client().performRequest(putStoppedDatafeed); + + Job.Builder extraJob = new Job.Builder(OLD_CLUSTER_CLOSED_JOB_EXTRA_ID); + extraJob.setAnalysisConfig(analysisConfig); + extraJob.setDataDescription(new DataDescription.Builder()); + + Request putExtraJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_EXTRA_ID); + putExtraJob.setJsonEntity(Strings.toString(extraJob)); + client().performRequest(putExtraJob); + + putStoppedDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID); + stoppedDfBuilder.setId(OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID); + stoppedDfBuilder.setJobId(OLD_CLUSTER_CLOSED_JOB_EXTRA_ID); + putStoppedDatafeed.setJsonEntity(Strings.toString(stoppedDfBuilder.build())); + client().performRequest(putStoppedDatafeed); + + assertConfigInClusterState(); + } + + private void mixedClusterTests() throws Exception { + assertConfigInClusterState(); + checkJobs(); + checkDatafeeds(); + + // the job and datafeed left open during upgrade should + // be assigned to a node + waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID); + waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID); + } + + private void upgradedClusterTests() throws Exception { + tryUpdates(); + + // These requests may fail because the configs have not been migrated + // and open is disallowed prior to migration. + boolean jobOpened = openMigratedJob(OLD_CLUSTER_CLOSED_JOB_ID); + boolean datafeedStarted = false; + if (jobOpened) { + datafeedStarted = startMigratedDatafeed(OLD_CLUSTER_STOPPED_DATAFEED_ID); + } + + waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID), + Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID), + Collections.singletonList(OLD_CLUSTER_OPEN_JOB_ID), + Collections.singletonList(OLD_CLUSTER_STARTED_DATAFEED_ID)); + + // the job and datafeed left open during upgrade should + // be assigned to a node + waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID); + waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID); + + // Now config is definitely migrated open job and datafeed + // if the previous attempts failed + if (jobOpened == false) { + assertTrue(openMigratedJob(OLD_CLUSTER_CLOSED_JOB_ID)); + } + if (datafeedStarted == false) { + assertTrue(startMigratedDatafeed(OLD_CLUSTER_STOPPED_DATAFEED_ID)); + } + waitForJobToBeAssigned(OLD_CLUSTER_CLOSED_JOB_ID); + waitForDatafeedToBeAssigned(OLD_CLUSTER_STOPPED_DATAFEED_ID); + + // close the job left open during upgrade + Request stopDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_stop"); + client().performRequest(stopDatafeed); + + Request closeJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close"); + client().performRequest(closeJob); + + // now all jobs should be migrated + waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), + Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID), + Collections.emptyList(), + Collections.emptyList()); + + checkJobsMarkedAsMigrated(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID)); + + Request deleteDatafeed = new Request("DELETE", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); + client().performRequest(deleteDatafeed); + Request deleteJob = new Request("DELETE", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); + client().performRequest(deleteJob); + } + + @SuppressWarnings("unchecked") + private void checkJobs() throws IOException { + // Wildcard expansion of jobs and datafeeds was added in 6.1.0 + if (UPGRADED_FROM_VERSION.before(Version.V_6_1_0) && CLUSTER_TYPE != ClusterType.UPGRADED) { + return; + } + + Request getJobs = new Request("GET", "_xpack/ml/anomaly_detectors/migration*"); + Response response = client().performRequest(getJobs); + + Map jobs = entityAsMap(response); + List> jobConfigs = + (List>) XContentMapValues.extractValue("jobs", jobs); + + assertThat(jobConfigs, hasSize(3)); + assertEquals(OLD_CLUSTER_CLOSED_JOB_ID, jobConfigs.get(0).get("job_id")); + assertEquals(OLD_CLUSTER_CLOSED_JOB_EXTRA_ID, jobConfigs.get(1).get("job_id")); + assertEquals(OLD_CLUSTER_OPEN_JOB_ID, jobConfigs.get(2).get("job_id")); + + Map customSettings = (Map)jobConfigs.get(0).get("custom_settings"); + if (customSettings != null) { + assertNull(customSettings.get("migrated from version")); + } + customSettings = (Map)jobConfigs.get(1).get("custom_settings"); + if (customSettings != null) { + assertNull(customSettings.get("migrated from version")); + } + + Request getJobStats = new Request("GET", "_xpack/ml/anomaly_detectors/migration*/_stats"); + response = client().performRequest(getJobStats); + + Map stats = entityAsMap(response); + List> jobStats = + (List>) XContentMapValues.extractValue("jobs", stats); + assertThat(jobStats, hasSize(3)); + + assertEquals(OLD_CLUSTER_CLOSED_JOB_ID, XContentMapValues.extractValue("job_id", jobStats.get(0))); + assertEquals("closed", XContentMapValues.extractValue("state", jobStats.get(0))); + assertThat((String)XContentMapValues.extractValue("assignment_explanation", jobStats.get(0)), isEmptyOrNullString()); + + assertEquals(OLD_CLUSTER_OPEN_JOB_ID, XContentMapValues.extractValue("job_id", jobStats.get(2))); + assertEquals("opened", XContentMapValues.extractValue("state", jobStats.get(2))); + assertThat((String)XContentMapValues.extractValue("assignment_explanation", jobStats.get(2)), isEmptyOrNullString()); + } + + @SuppressWarnings("unchecked") + private void checkDatafeeds() throws IOException { + // Wildcard expansion of jobs and datafeeds was added in 6.1.0 + if (UPGRADED_FROM_VERSION.before(Version.V_6_1_0) && CLUSTER_TYPE != ClusterType.UPGRADED) { + return; + } + + Request getDatafeeds = new Request("GET", "_xpack/ml/datafeeds/migration*"); + Response response = client().performRequest(getDatafeeds); + List> configs = + (List>) XContentMapValues.extractValue("datafeeds", entityAsMap(response)); + assertThat(configs, hasSize(3)); + assertEquals(OLD_CLUSTER_STARTED_DATAFEED_ID, XContentMapValues.extractValue("datafeed_id", configs.get(0))); + assertEquals(OLD_CLUSTER_STOPPED_DATAFEED_ID, XContentMapValues.extractValue("datafeed_id", configs.get(1))); + assertEquals(OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID, XContentMapValues.extractValue("datafeed_id", configs.get(2))); + + Request getDatafeedStats = new Request("GET", "_xpack/ml/datafeeds/migration*/_stats"); + response = client().performRequest(getDatafeedStats); + configs = (List>) XContentMapValues.extractValue("datafeeds", entityAsMap(response)); + assertThat(configs, hasSize(3)); + assertEquals(OLD_CLUSTER_STARTED_DATAFEED_ID, XContentMapValues.extractValue("datafeed_id", configs.get(0))); + assertEquals("started", XContentMapValues.extractValue("state", configs.get(0))); + assertEquals(OLD_CLUSTER_STOPPED_DATAFEED_ID, XContentMapValues.extractValue("datafeed_id", configs.get(1))); + assertEquals("stopped", XContentMapValues.extractValue("state", configs.get(1))); + assertEquals(OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID, XContentMapValues.extractValue("datafeed_id", configs.get(2))); + assertEquals("stopped", XContentMapValues.extractValue("state", configs.get(2))); + } + + @SuppressWarnings("unchecked") + private void checkJobsMarkedAsMigrated(List jobIds) throws IOException { + String requestedIds = String.join(",", jobIds); + Request getJobs = new Request("GET", "_xpack/ml/anomaly_detectors/" + requestedIds); + Response response = client().performRequest(getJobs); + List> jobConfigs = + (List>) XContentMapValues.extractValue("jobs", entityAsMap(response)); + + for (Map config : jobConfigs) { + assertJobIsMarkedAsMigrated(config); + } + } + + @SuppressWarnings("unchecked") + private void assertConfigInClusterState() throws IOException { + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> jobs = + (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); + assertThat(jobs, not(empty())); + Optional job = jobs.stream().map(map -> map.get("job_id")).filter(id -> id.equals(OLD_CLUSTER_OPEN_JOB_ID)).findFirst(); + assertTrue(job.isPresent()); + job = jobs.stream().map(map -> map.get("job_id")).filter(id -> id.equals(OLD_CLUSTER_CLOSED_JOB_ID)).findFirst(); + assertTrue(job.isPresent()); + + List> datafeeds = + (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); + assertNotNull(datafeeds); + assertThat(datafeeds, not(empty())); + Optional datafeed = datafeeds.stream().map(map -> map.get("datafeed_id")) + .filter(id -> id.equals(OLD_CLUSTER_STARTED_DATAFEED_ID)).findFirst(); + assertTrue(datafeed.isPresent()); + datafeed = datafeeds.stream().map(map -> map.get("datafeed_id")) + .filter(id -> id.equals(OLD_CLUSTER_STOPPED_DATAFEED_ID)).findFirst(); + assertTrue(datafeed.isPresent()); + } + + @SuppressWarnings("unchecked") + private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds, + List unMigratedJobs, List unMigratedDatafeeds) throws Exception { + assertBusy(() -> { + // wait for the eligible configs to be moved from the clusterstate + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> jobs = + (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); + assertNotNull(jobs); + + for (String jobId : expectedMigratedJobs) { + assertJobMigrated(jobId, jobs); + } + + for (String jobId : unMigratedJobs) { + assertJobNotMigrated(jobId, jobs); + } + + List> datafeeds = + (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); + assertNotNull(datafeeds); + + for (String datafeedId : expectedMigratedDatafeeds) { + assertDatafeedMigrated(datafeedId, datafeeds); + } + + for (String datafeedId : unMigratedDatafeeds) { + assertDatafeedNotMigrated(datafeedId, datafeeds); + } + + }, 30, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + private void waitForJobToBeAssigned(String jobId) throws Exception { + assertBusy(() -> { + try { + Request getJobStats = new Request("GET", "_xpack/ml/anomaly_detectors/" + jobId + "/_stats"); + Response response = client().performRequest(getJobStats); + + Map stats = entityAsMap(response); + List> jobStats = + (List>) XContentMapValues.extractValue("jobs", stats); + + assertEquals(jobId, XContentMapValues.extractValue("job_id", jobStats.get(0))); + assertEquals("opened", XContentMapValues.extractValue("state", jobStats.get(0))); + assertThat((String)XContentMapValues.extractValue("assignment_explanation", jobStats.get(0)), isEmptyOrNullString()); + assertNotNull(XContentMapValues.extractValue("node", jobStats.get(0))); + } catch (IOException e) { + + } + }, 30, TimeUnit.SECONDS); + } + + @SuppressWarnings("unchecked") + private void waitForDatafeedToBeAssigned(String datafeedId) throws Exception { + assertBusy(() -> { + Request getDatafeedStats = new Request("GET", "_xpack/ml/datafeeds/" + datafeedId + "/_stats"); + Response response = client().performRequest(getDatafeedStats); + Map stats = entityAsMap(response); + List> datafeedStats = + (List>) XContentMapValues.extractValue("datafeeds", stats); + + assertEquals(datafeedId, XContentMapValues.extractValue("datafeed_id", datafeedStats.get(0))); + assertEquals("started", XContentMapValues.extractValue("state", datafeedStats.get(0))); + assertThat((String) XContentMapValues.extractValue("assignment_explanation", datafeedStats.get(0)), isEmptyOrNullString()); + assertNotNull(XContentMapValues.extractValue("node", datafeedStats.get(0))); + }, 30, TimeUnit.SECONDS); + } + + private boolean openMigratedJob(String jobId) throws IOException { + // opening a job should be rejected prior to migration + Request openJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + jobId + "/_open"); + return updateJobExpectingSuccessOr503(jobId, openJob, "cannot open job as the configuration [" + + jobId + "] is temporarily pending migration", false); + } + + private boolean startMigratedDatafeed(String datafeedId) throws IOException { + Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + datafeedId + "/_start"); + return updateDatafeedExpectingSuccessOr503(datafeedId, startDatafeed, "cannot start datafeed as the configuration [" + + datafeedId + "] is temporarily pending migration", false); + } + + private void tryUpdates() throws IOException { + // in the upgraded cluster updates should be rejected prior + // to migration. Either the config is migrated or the update + // is rejected with the expected error + + // delete datafeed + Request deleteDatafeed = new Request("DELETE", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID); + boolean datafeedDeleted = updateDatafeedExpectingSuccessOr503(OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID, deleteDatafeed, + "cannot delete datafeed as the configuration [" + OLD_CLUSTER_STOPPED_DATAFEED_EXTRA_ID + + "] is temporarily pending migration", true); + + if (datafeedDeleted && randomBoolean()) { + // delete job if the datafeed that refers to it was deleted + // otherwise the request is invalid + Request deleteJob = new Request("DELETE", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_EXTRA_ID); + updateJobExpectingSuccessOr503(OLD_CLUSTER_CLOSED_JOB_EXTRA_ID, deleteJob, "cannot update job as the configuration [" + + OLD_CLUSTER_CLOSED_JOB_EXTRA_ID + "] is temporarily pending migration", true); + } else { + // update job + Request updateJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_EXTRA_ID + "/_update"); + updateJob.setJsonEntity("{\"description\" : \"updated description\"}"); + updateJobExpectingSuccessOr503(OLD_CLUSTER_CLOSED_JOB_EXTRA_ID, updateJob, "cannot update job as the configuration [" + + OLD_CLUSTER_CLOSED_JOB_EXTRA_ID + "] is temporarily pending migration", false); + } + + + } + + @SuppressWarnings("unchecked") + private boolean updateJobExpectingSuccessOr503(String jobId, Request request, + String expectedErrorMessage, boolean deleting) throws IOException { + try { + client().performRequest(request); + + // the request was successful so the job should have been migrated + // ...unless it was deleted + if (deleting) { + return true; + } + + Request getJob = new Request("GET", "_xpack/ml/anomaly_detectors/" + jobId); + Response response = client().performRequest(getJob); + List> jobConfigs = + (List>) XContentMapValues.extractValue("jobs", entityAsMap(response)); + assertJobIsMarkedAsMigrated(jobConfigs.get(0)); + return true; + } catch (ResponseException e) { + // a fail request is ok if the error was that the config has not been migrated + assertThat(e.getMessage(), containsString(expectedErrorMessage)); + assertEquals(503, e.getResponse().getStatusLine().getStatusCode()); + return false; + } + } + + @SuppressWarnings("unchecked") + private boolean updateDatafeedExpectingSuccessOr503(String datafeedId, Request request, + String expectedErrorMessage, boolean deleting) throws IOException { + // starting a datafeed should be rejected prior to migration + try { + client().performRequest(request); + + // the request was successful so the job should have been migrated + // ...unless it was deleted + if (deleting) { + return true; + } + + // if the request succeeded the config must have been migrated out of clusterstate + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map clusterStateMap = entityAsMap(response); + List> datafeeds = + (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", clusterStateMap); + assertDatafeedMigrated(datafeedId, datafeeds); + return true; + } catch (ResponseException e) { + // a fail request is ok if the error was that the config has not been migrated + assertThat(e.getMessage(), containsString(expectedErrorMessage)); + assertEquals(503, e.getResponse().getStatusLine().getStatusCode()); + return false; + } + } + + @SuppressWarnings("unchecked") + private void assertJobIsMarkedAsMigrated(Map job) { + Map customSettings = (Map)job.get("custom_settings"); + assertThat(customSettings.keySet(), contains("migrated from version")); + assertEquals(UPGRADED_FROM_VERSION.toString(), customSettings.get("migrated from version").toString()); + } + + private void assertDatafeedMigrated(String datafeedId, List> datafeeds) { + assertDatafeed(datafeedId, datafeeds, false); + } + + private void assertDatafeedNotMigrated(String datafeedId, List> datafeeds) { + assertDatafeed(datafeedId, datafeeds, true); + } + + private void assertDatafeed(String datafeedId, List> datafeeds, boolean expectedToBePresent) { + Optional config = datafeeds.stream().map(map -> map.get("datafeed_id")) + .filter(id -> id.equals(datafeedId)).findFirst(); + if (expectedToBePresent) { + assertTrue(config.isPresent()); + } else { + assertFalse(config.isPresent()); + } + } + + private void assertJobMigrated(String jobId, List> jobs) { + assertJob(jobId, jobs, false); + } + + private void assertJobNotMigrated(String jobId, List> jobs) { + assertJob(jobId, jobs, true); + } + + private void assertJob(String jobId, List> jobs, boolean expectedToBePresent) { + Optional config = jobs.stream().map(map -> map.get("job_id")) + .filter(id -> id.equals(jobId)).findFirst(); + if (expectedToBePresent) { + assertTrue(config.isPresent()); + } else { + assertFalse(config.isPresent()); + } + } + +} diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml index 6e06d2d4db827..b076828fc856e 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/60_ml_config_migration.yml @@ -21,8 +21,6 @@ - is_false: jobs.0.node - match: { jobs.1.job_id: migration-old-cluster-open-job} - match: { jobs.1.state: opened } -# TODO can't test for assignment here as the job may not be re-allocated yet -# - is_true: jobs.1.node - is_false: jobs.1.assignment_explanation - do: @@ -39,8 +37,6 @@ datafeed_id: migration* - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed} - match: { datafeeds.0.state: started } -# TODO can't test for assignment here as the datafeed may not be re-allocated yet -# - is_true: datafeeds.0.node - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed} - match: { datafeeds.1.state: stopped } - is_false: datafeeds.1.node diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml index 227005185fc0d..be36d7358e794 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml @@ -25,8 +25,6 @@ setup: - is_false: jobs.0.node - match: { jobs.1.job_id: migration-old-cluster-open-job } - match: { jobs.1.state: opened } -# TODO can't test for assignment here as the job may not be re-allocated yet -# - is_true: jobs.1.node - is_false: jobs.1.assignment_explanation - do: @@ -43,8 +41,6 @@ setup: datafeed_id: migration* - match: { datafeeds.0.datafeed_id: migration-old-cluster-started-datafeed } - match: { datafeeds.0.state: started } -# TODO can't test for assignment here as the datafeed may not be re-allocated yet -# - is_true: datafeeds.0.node - match: { datafeeds.1.datafeed_id: migration-old-cluster-stopped-datafeed } - match: { datafeeds.1.state: stopped } - is_false: datafeeds.1.node @@ -61,35 +57,3 @@ setup: xpack.ml.get_jobs: job_id: migration-old-cluster-open-job - is_true: jobs.0.finished_time - -# TODO cannot test delete as config may be migrating -# - do: -# xpack.ml.delete_datafeed: -# datafeed_id: migration-old-cluster-started-datafeed -# -# - do: -# xpack.ml.delete_job: -# job_id: migration-old-cluster-open-job -# -# - do: -# catch: missing -# xpack.ml.get_jobs: -# job_id: migration-old-cluster-open-job -# -# - do: -# xpack.ml.delete_datafeed: -# datafeed_id: migration-old-cluster-stopped-datafeed -# -# - do: -# xpack.ml.delete_job: -# job_id: migration-old-cluster-closed-job -# -# - do: -# xpack.ml.get_jobs: -# job_id: migration* -# - match: { count: 0 } -# -# - do: -# xpack.ml.get_datafeeds: -# datafeed_id: migration* -# - match: { count: 0 }