Skip to content

Commit 2e2495c

Browse files
authored
[ML] Full cluster restart tests for migration (#36593)
1 parent 76858bb commit 2e2495c

File tree

3 files changed

+256
-9
lines changed

3 files changed

+256
-9
lines changed

x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,21 @@
1212
import org.elasticsearch.client.ResponseException;
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.util.concurrent.ThreadContext;
15+
import org.elasticsearch.common.xcontent.ObjectPath;
1516
import org.elasticsearch.common.xcontent.XContentType;
1617
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1718
import org.elasticsearch.rest.RestStatus;
1819
import org.elasticsearch.test.StreamsUtils;
1920
import org.elasticsearch.test.rest.ESRestTestCase;
2021
import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;
2122
import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder;
22-
import org.elasticsearch.common.xcontent.ObjectPath;
2323
import org.elasticsearch.xpack.security.support.SecurityIndexManager;
24-
import org.elasticsearch.xpack.test.rest.XPackRestTestHelper;
2524
import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction;
2625
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
2726
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
2827
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
2928
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
3029
import org.hamcrest.Matcher;
31-
import org.junit.Before;
3230

3331
import java.io.IOException;
3432
import java.nio.charset.StandardCharsets;
@@ -57,11 +55,6 @@
5755

5856
public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
5957

60-
@Before
61-
public void waitForMlTemplates() throws Exception {
62-
XPackRestTestHelper.waitForTemplates(client(), XPackRestTestHelper.ML_PRE_V660_TEMPLATES);
63-
}
64-
6558
@Override
6659
protected Settings restClientSettings() {
6760
String token = "Basic " + Base64.getEncoder().encodeToString("test_user:x-pack-test-password".getBytes(StandardCharsets.UTF_8));
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.restart;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.client.Request;
10+
import org.elasticsearch.client.Response;
11+
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.unit.TimeValue;
14+
import org.elasticsearch.common.util.concurrent.ThreadContext;
15+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
16+
import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;
17+
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
18+
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
19+
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
20+
import org.elasticsearch.xpack.core.ml.job.config.Detector;
21+
import org.elasticsearch.xpack.core.ml.job.config.Job;
22+
import org.elasticsearch.xpack.test.rest.XPackRestTestHelper;
23+
import org.junit.Before;
24+
25+
import java.io.IOException;
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.Arrays;
28+
import java.util.Base64;
29+
import java.util.Collections;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Optional;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import static org.hamcrest.Matchers.isEmptyOrNullString;
36+
37+
public class MlMigrationFullClusterRestartIT extends AbstractFullClusterRestartTestCase {
38+
39+
private static final String OLD_CLUSTER_OPEN_JOB_ID = "migration-old-cluster-open-job";
40+
private static final String OLD_CLUSTER_STARTED_DATAFEED_ID = "migration-old-cluster-started-datafeed";
41+
private static final String OLD_CLUSTER_CLOSED_JOB_ID = "migration-old-cluster-closed-job";
42+
private static final String OLD_CLUSTER_STOPPED_DATAFEED_ID = "migration-old-cluster-stopped-datafeed";
43+
44+
@Override
45+
protected Settings restClientSettings() {
46+
String token = "Basic " + Base64.getEncoder().encodeToString("test_user:x-pack-test-password".getBytes(StandardCharsets.UTF_8));
47+
return Settings.builder()
48+
.put(ThreadContext.PREFIX + ".Authorization", token)
49+
.build();
50+
}
51+
52+
@Before
53+
public void waitForMlTemplates() throws Exception {
54+
List<String> templatesToWaitFor = XPackRestTestHelper.ML_POST_V660_TEMPLATES;
55+
56+
// If upgrading from a version prior to v6.6.0 the set of templates
57+
// to wait for is different
58+
if (isRunningAgainstOldCluster() && getOldClusterVersion().before(Version.V_6_6_0) ) {
59+
templatesToWaitFor = XPackRestTestHelper.ML_PRE_V660_TEMPLATES;
60+
}
61+
62+
XPackRestTestHelper.waitForTemplates(client(), templatesToWaitFor);
63+
}
64+
65+
private void createTestIndex() throws IOException {
66+
Request createTestIndex = new Request("PUT", "/airline-data");
67+
createTestIndex.setJsonEntity("{\"mappings\": { \"doc\": {\"properties\": {" +
68+
"\"time\": {\"type\": \"date\"}," +
69+
"\"airline\": {\"type\": \"keyword\"}," +
70+
"\"responsetime\": {\"type\": \"float\"}" +
71+
"}}}}");
72+
client().performRequest(createTestIndex);
73+
}
74+
75+
public void testMigration() throws Exception {
76+
if (isRunningAgainstOldCluster()) {
77+
createTestIndex();
78+
oldClusterTests();
79+
} else {
80+
upgradedClusterTests();
81+
}
82+
}
83+
84+
private void oldClusterTests() throws IOException {
85+
// create jobs and datafeeds
86+
Detector.Builder d = new Detector.Builder("metric", "responsetime");
87+
d.setByFieldName("airline");
88+
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
89+
analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10));
90+
Job.Builder openJob = new Job.Builder(OLD_CLUSTER_OPEN_JOB_ID);
91+
openJob.setAnalysisConfig(analysisConfig);
92+
openJob.setDataDescription(new DataDescription.Builder());
93+
94+
Request putOpenJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID);
95+
putOpenJob.setJsonEntity(Strings.toString(openJob));
96+
client().performRequest(putOpenJob);
97+
98+
Request openOpenJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_open");
99+
client().performRequest(openOpenJob);
100+
101+
DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STARTED_DATAFEED_ID, OLD_CLUSTER_OPEN_JOB_ID);
102+
if (getOldClusterVersion().before(Version.V_6_6_0)) {
103+
dfBuilder.setDelayedDataCheckConfig(null);
104+
}
105+
dfBuilder.setIndices(Collections.singletonList("airline-data"));
106+
dfBuilder.setTypes(Collections.singletonList("doc"));
107+
108+
Request putDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID);
109+
putDatafeed.setJsonEntity(Strings.toString(dfBuilder.build()));
110+
client().performRequest(putDatafeed);
111+
112+
Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_start");
113+
client().performRequest(startDatafeed);
114+
115+
Job.Builder closedJob = new Job.Builder(OLD_CLUSTER_CLOSED_JOB_ID);
116+
closedJob.setAnalysisConfig(analysisConfig);
117+
closedJob.setDataDescription(new DataDescription.Builder());
118+
119+
Request putClosedJob = new Request("PUT", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID);
120+
putClosedJob.setJsonEntity(Strings.toString(closedJob));
121+
client().performRequest(putClosedJob);
122+
123+
DatafeedConfig.Builder stoppedDfBuilder = new DatafeedConfig.Builder(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_CLOSED_JOB_ID);
124+
if (getOldClusterVersion().before(Version.V_6_6_0)) {
125+
stoppedDfBuilder.setDelayedDataCheckConfig(null);
126+
}
127+
stoppedDfBuilder.setIndices(Collections.singletonList("airline-data"));
128+
129+
Request putStoppedDatafeed = new Request("PUT", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_ID);
130+
putStoppedDatafeed.setJsonEntity(Strings.toString(stoppedDfBuilder.build()));
131+
client().performRequest(putStoppedDatafeed);
132+
}
133+
134+
private void upgradedClusterTests() throws Exception {
135+
// wait for the closed job and datafeed to be migrated
136+
waitForMigration(Collections.singletonList(OLD_CLUSTER_CLOSED_JOB_ID),
137+
Collections.singletonList(OLD_CLUSTER_STOPPED_DATAFEED_ID),
138+
Collections.singletonList(OLD_CLUSTER_OPEN_JOB_ID),
139+
Collections.singletonList(OLD_CLUSTER_STARTED_DATAFEED_ID));
140+
141+
// the job and datafeed left open during upgrade should
142+
// be assigned to a node
143+
waitForJobToBeAssigned(OLD_CLUSTER_OPEN_JOB_ID);
144+
waitForDatafeedToBeAssigned(OLD_CLUSTER_STARTED_DATAFEED_ID);
145+
146+
// open the migrated job and datafeed
147+
Request openJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_ID + "/_open");
148+
client().performRequest(openJob);
149+
Request startDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STOPPED_DATAFEED_ID + "/_start");
150+
client().performRequest(startDatafeed);
151+
152+
// close the job left open during upgrade
153+
Request stopDatafeed = new Request("POST", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID + "/_stop");
154+
client().performRequest(stopDatafeed);
155+
156+
Request closeJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close");
157+
client().performRequest(closeJob);
158+
159+
// now all jobs should be migrated
160+
waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID),
161+
Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID),
162+
Collections.emptyList(),
163+
Collections.emptyList());
164+
}
165+
166+
@SuppressWarnings("unchecked")
167+
private void waitForJobToBeAssigned(String jobId) throws Exception {
168+
assertBusy(() -> {
169+
Request getJobStats = new Request("GET", "_xpack/ml/anomaly_detectors/" + jobId + "/_stats");
170+
Response response = client().performRequest(getJobStats);
171+
172+
Map<String, Object> stats = entityAsMap(response);
173+
List<Map<String, Object>> jobStats =
174+
(List<Map<String, Object>>) XContentMapValues.extractValue("jobs", stats);
175+
176+
assertEquals(jobId, XContentMapValues.extractValue("job_id", jobStats.get(0)));
177+
assertEquals("opened", XContentMapValues.extractValue("state", jobStats.get(0)));
178+
assertThat((String) XContentMapValues.extractValue("assignment_explanation", jobStats.get(0)), isEmptyOrNullString());
179+
assertNotNull(XContentMapValues.extractValue("node", jobStats.get(0)));
180+
}, 30, TimeUnit.SECONDS);
181+
}
182+
183+
@SuppressWarnings("unchecked")
184+
private void waitForDatafeedToBeAssigned(String datafeedId) throws Exception {
185+
assertBusy(() -> {
186+
Request getDatafeedStats = new Request("GET", "_xpack/ml/datafeeds/" + datafeedId + "/_stats");
187+
Response response = client().performRequest(getDatafeedStats);
188+
Map<String, Object> stats = entityAsMap(response);
189+
List<Map<String, Object>> datafeedStats =
190+
(List<Map<String, Object>>) XContentMapValues.extractValue("datafeeds", stats);
191+
192+
assertEquals(datafeedId, XContentMapValues.extractValue("datafeed_id", datafeedStats.get(0)));
193+
assertEquals("started", XContentMapValues.extractValue("state", datafeedStats.get(0)));
194+
assertThat((String) XContentMapValues.extractValue("assignment_explanation", datafeedStats.get(0)), isEmptyOrNullString());
195+
assertNotNull(XContentMapValues.extractValue("node", datafeedStats.get(0)));
196+
}, 30, TimeUnit.SECONDS);
197+
}
198+
199+
@SuppressWarnings("unchecked")
200+
private void waitForMigration(List<String> expectedMigratedJobs, List<String> expectedMigratedDatafeeds,
201+
List<String> unMigratedJobs, List<String> unMigratedDatafeeds) throws Exception {
202+
assertBusy(() -> {
203+
// wait for the eligible configs to be moved from the clusterstate
204+
Request getClusterState = new Request("GET", "/_cluster/state/metadata");
205+
Response response = client().performRequest(getClusterState);
206+
Map<String, Object> responseMap = entityAsMap(response);
207+
208+
List<Map<String, Object>> jobs =
209+
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap);
210+
assertNotNull(jobs);
211+
212+
for (String jobId : expectedMigratedJobs) {
213+
assertJob(jobId, jobs, false);
214+
}
215+
216+
for (String jobId : unMigratedJobs) {
217+
assertJob(jobId, jobs, true);
218+
}
219+
220+
List<Map<String, Object>> datafeeds =
221+
(List<Map<String, Object>>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap);
222+
assertNotNull(datafeeds);
223+
224+
for (String datafeedId : expectedMigratedDatafeeds) {
225+
assertDatafeed(datafeedId, datafeeds, false);
226+
}
227+
228+
for (String datafeedId : unMigratedDatafeeds) {
229+
assertDatafeed(datafeedId, datafeeds, true);
230+
}
231+
232+
}, 30, TimeUnit.SECONDS);
233+
}
234+
235+
private void assertDatafeed(String datafeedId, List<Map<String, Object>> datafeeds, boolean expectedToBePresent) {
236+
Optional<Object> config = datafeeds.stream().map(map -> map.get("datafeed_id"))
237+
.filter(id -> id.equals(datafeedId)).findFirst();
238+
if (expectedToBePresent) {
239+
assertTrue(config.isPresent());
240+
} else {
241+
assertFalse(config.isPresent());
242+
}
243+
}
244+
245+
private void assertJob(String jobId, List<Map<String, Object>> jobs, boolean expectedToBePresent) {
246+
Optional<Object> config = jobs.stream().map(map -> map.get("job_id"))
247+
.filter(id -> id.equals(jobId)).findFirst();
248+
if (expectedToBePresent) {
249+
assertTrue(config.isPresent());
250+
} else {
251+
assertFalse(config.isPresent());
252+
}
253+
}
254+
}

x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ private void tryUpdates() throws IOException {
460460
+ "] is temporarily pending migration", true);
461461

462462
if (datafeedDeleted && randomBoolean()) {
463-
// delete job if the datafeed that refers to it was deleted
463+
// delete job if the datafeed that refers to it was deleted
464464
// otherwise the request is invalid
465465
Request deleteJob = new Request("DELETE", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_CLOSED_JOB_EXTRA_ID);
466466
updateJobExpectingSuccessOr503(OLD_CLUSTER_CLOSED_JOB_EXTRA_ID, deleteJob, "cannot update job as the configuration ["

0 commit comments

Comments
 (0)