Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
Expand All @@ -35,6 +36,8 @@

public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartTestCase {

private static final String OLD_CLUSTER_OPEN_JOB_ID = "migration-old-cluster-open-job";
private static final String OLD_CLUSTER_STARTED_DATAFEED_ID = "migration-old-cluster-started-datafeed";
private static final String OLD_CLUSTER_CLOSED_JOB_ID = "migration-old-cluster-closed-job";
private static final String OLD_CLUSTER_STOPPED_DATAFEED_ID = "migration-old-cluster-stopped-datafeed";

Expand Down Expand Up @@ -69,7 +72,6 @@ private void createTestIndex() throws IOException {
client().performRequest(createTestIndex);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36816")
public void testMigration() throws Exception {
if (isRunningAgainstOldCluster()) {
createTestIndex();
Expand All @@ -86,6 +88,31 @@ private void oldClusterTests() throws IOException {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10));

Job.Builder openJob = new Job.Builder(OLD_CLUSTER_OPEN_JOB_ID);
openJob.setAnalysisConfig(analysisConfig);
openJob.setDataDescription(new DataDescription.Builder());

Request putOpenJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID);
putOpenJob.setJsonEntity(Strings.toString(openJob));
client().performRequest(putOpenJob);

Request openOpenJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_open");
client().performRequest(openOpenJob);

DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID);
if (getOldClusterVersion().before(Version.V_6_6_0)) {
dfBuilder.setDelayedDataCheckConfig(null);
}
dfBuilder.setIndices(Collections.singletonList("airline-data"));
dfBuilder.setTypes(Collections.singletonList("doc"));

Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID);
putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build()));
client().performRequest(putDatafeed);

Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_start");
client().performRequest(startDatafeed);

Job.Builder closedJob = new Job.Builder(OLD_CLUSTER_CLOSED_JOB_ID);
closedJob.setAnalysisConfig(analysisConfig);
closedJob.setDataDescription(new DataDescription.Builder());
Expand All @@ -109,7 +136,13 @@ private void upgradedClusterTests() throws Exception {
// wait for the closed job and datafeed to be migrated
waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID),
Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID),
Collections.emptyList(), Collections.emptyList());
Collections.singletonList(OLD_CLUSTER_OPEN_JOB_ID),
Collections.singletonList(OLD_CLUSTER_STARTED_DATAFEED_ID));

// the job and datafeed left open during upgrade should
// be assigned to a node
waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID);
waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID);

// open the migrated job and datafeed
Request openJob = new Request("POST", "_ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID + "/_open");
Expand All @@ -119,6 +152,19 @@ private void upgradedClusterTests() throws Exception {

waitForJobToBeAssigned(OLD_CLUSTER_CLOSED_JOB_ID);
waitForDatafeedToBeAssigned(OLD_CLUSTER_STOPPED_DATAFEED_ID);

// stop the datafeed and close the job left open during upgrade
Request stopDatafeed = new Request("POST", "_ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_stop");
client().performRequest(stopDatafeed);

Request closeJob = new Request("POST", "_ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close");
client().performRequest(closeJob);

// now all jobs should be migrated
waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID),
Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID),
Collections.emptyList(),
Collections.emptyList());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -157,6 +203,12 @@ private void waitForDatafeedToBeAssigned(String datafeedId) throws Exception {
@SuppressWarnings("unchecked")
private void waitForMigration(List<String> expectedMigratedJobs, List<String> expectedMigratedDatafeeds,
List<String> unMigratedJobs, List<String> unMigratedDatafeeds) throws Exception {

// After v6.6.0 jobs are created in the index so no migration will take place
if (getOldClusterVersion().onOrAfter(Version.V_6_6_0)) {
return;
}

assertBusy(() -> {
// wait for the eligible configs to be moved from the clusterstate
Request getClusterState = new Request("GET", "/_cluster/state/metadata");
Expand Down