From 04cc9833baa260b829357a66d88b6ee73848821c Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 2 Jan 2019 17:05:05 +0000 Subject: [PATCH] Don't run migration checks againts v6.6.0 clusters --- .../MlMigrationFullClusterRestartIT.java | 56 ++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java index 36857635951e2..56696021ffe55 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/MlMigrationFullClusterRestartIT.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.List; @@ -35,6 +36,8 @@ public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartTestCase { + private static final String OLD_CLUSTER_OPEN_JOB_ID = "migration-old-cluster-open-job"; + private static final String OLD_CLUSTER_STARTED_DATAFEED_ID = "migration-old-cluster-started-datafeed"; private static final String OLD_CLUSTER_CLOSED_JOB_ID = "migration-old-cluster-closed-job"; private static final String OLD_CLUSTER_STOPPED_DATAFEED_ID = "migration-old-cluster-stopped-datafeed"; @@ -69,7 +72,6 @@ private void createTestIndex() throws IOException { client().performRequest(createTestIndex); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36816") public void testMigration() throws Exception { if (isRunningAgainstOldCluster()) { createTestIndex(); @@ -86,6 +88,31 @@ private void oldClusterTests() throws IOException { AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10)); + Job.Builder openJob = new Job.Builder(OLD_CLUSTER_OPEN_JOB_ID); + openJob.setAnalysisConfig(analysisConfig); + openJob.setDataDescription(new DataDescription.Builder()); + + Request putOpenJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); + putOpenJob.setJsonEntity(Strings.toString(openJob)); + client().performRequest(putOpenJob); + + Request openOpenJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_open"); + client().performRequest(openOpenJob); + + DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID); + if (getOldClusterVersion().before(Version.V_6_6_0)) { + dfBuilder.setDelayedDataCheckConfig(null); + } + dfBuilder.setIndices(Collections.singletonList("airline-data")); + dfBuilder.setTypes(Collections.singletonList("doc")); + + Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); + putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build())); + client().performRequest(putDatafeed); + + Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_start"); + client().performRequest(startDatafeed); + Job.Builder closedJob = new Job.Builder(OLD_CLUSTER_CLOSED_JOB_ID); closedJob.setAnalysisConfig(analysisConfig); closedJob.setDataDescription(new DataDescription.Builder()); @@ -109,7 +136,13 @@ private void upgradedClusterTests() throws Exception { // wait for the closed job and datafeed to be migrated waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID), Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID), - Collections.emptyList(), Collections.emptyList()); + Collections.singletonList(OLD_CLUSTER_OPEN_JOB_ID), + Collections.singletonList(OLD_CLUSTER_STARTED_DATAFEED_ID)); + + // the job and datafeed left open during upgrade should + // be assigned to a node + waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID); + waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID); // open the migrated job and datafeed Request openJob = new Request("POST", "_ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID + "/_open"); @@ -119,6 +152,19 @@ private void upgradedClusterTests() throws Exception { waitForJobToBeAssigned(OLD_CLUSTER_CLOSED_JOB_ID); waitForDatafeedToBeAssigned(OLD_CLUSTER_STOPPED_DATAFEED_ID); + + // stop the datafeed and close the job left open during upgrade + Request stopDatafeed = new Request("POST", "_ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_stop"); + client().performRequest(stopDatafeed); + + Request closeJob = new Request("POST", "_ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close"); + client().performRequest(closeJob); + + // now all jobs should be migrated + waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), + Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID), + Collections.emptyList(), + Collections.emptyList()); } @SuppressWarnings("unchecked") @@ -157,6 +203,12 @@ private void waitForDatafeedToBeAssigned(String datafeedId) throws Exception { @SuppressWarnings("unchecked") private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds, List unMigratedJobs, List unMigratedDatafeeds) throws Exception { + + // After v6.6.0 jobs are created in the index so no migration will take place + if (getOldClusterVersion().onOrAfter(Version.V_6_6_0)) { + return; + } + assertBusy(() -> { // wait for the eligible configs to be moved from the clusterstate Request getClusterState = new Request("GET", "/_cluster/state/metadata");