@@ -204,12 +204,15 @@ private void upgradedClusterTests() throws Exception {
204204 datafeedStarted = startMigratedDatafeed (OLD_CLUSTER_STOPPED_DATAFEED_ID );
205205 }
206206
207- // wait for the closed and open jobs and datafeed to be migrated
208- waitForMigration (Arrays .asList (OLD_CLUSTER_CLOSED_JOB_ID , OLD_CLUSTER_OPEN_JOB_ID ),
209- Arrays .asList (OLD_CLUSTER_STOPPED_DATAFEED_ID , OLD_CLUSTER_STARTED_DATAFEED_ID ));
207+ // wait for the closed job and datafeed to be migrated
208+ waitForMigration (OLD_CLUSTER_CLOSED_JOB_ID , OLD_CLUSTER_STOPPED_DATAFEED_ID );
210209
211- checkTaskParamsAreUpdated (OLD_CLUSTER_OPEN_JOB_ID , OLD_CLUSTER_STARTED_DATAFEED_ID );
212- checkJobsMarkedAsMigrated (Arrays .asList (OLD_CLUSTER_CLOSED_JOB_ID , OLD_CLUSTER_OPEN_JOB_ID ));
210+ // The open job and datafeed may or may not be migrated depending on how they were allocated.
211+ // Migration will only occur once all nodes in the cluster are v6.6.0 or higher
212+ // open jobs will only be migrated once they become unallocated. The open job
213+ // will only meet these conditions if it is running on the last node to be
214+ // upgraded
215+ waitForPossibleMigration (OLD_CLUSTER_OPEN_JOB_ID , OLD_CLUSTER_STARTED_DATAFEED_ID );
213216
214217 // the job and datafeed left open during upgrade should
215218 // be assigned to a node
@@ -234,6 +237,11 @@ private void upgradedClusterTests() throws Exception {
234237 Request closeJob = new Request ("POST" , "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close" );
235238 client ().performRequest (closeJob );
236239
240+ // if the open job wasn't migrated previously it should be now after it has been closed
241+ waitForMigration (OLD_CLUSTER_OPEN_JOB_ID , OLD_CLUSTER_STARTED_DATAFEED_ID );
242+ checkJobsMarkedAsMigrated (Arrays .asList (OLD_CLUSTER_CLOSED_JOB_ID , OLD_CLUSTER_OPEN_JOB_ID ));
243+
244+ // and the job left open can be deleted
237245 Request deleteDatafeed = new Request ("DELETE" , "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID );
238246 client ().performRequest (deleteDatafeed );
239247 Request deleteJob = new Request ("DELETE" , "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID );
@@ -323,7 +331,7 @@ private void checkJobsMarkedAsMigrated(List<String> jobIds) throws IOException {
323331 }
324332
325333 @ SuppressWarnings ("unchecked" )
326- private void checkTaskParamsAreUpdated (String jobId , String datafeedId ) throws Exception {
334+ private void checkTaskParams (String jobId , String datafeedId , boolean expectedUpdated ) throws Exception {
327335 Request getClusterState = new Request ("GET" , "/_cluster/state/metadata" );
328336 Response response = client ().performRequest (getClusterState );
329337 Map <String , Object > responseMap = entityAsMap (response );
@@ -336,12 +344,21 @@ private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws E
336344 assertNotNull (id );
337345 if (id .equals (MlTasks .jobTaskId (jobId ))) {
338346 Object jobParam = XContentMapValues .extractValue ("task.xpack/ml/job.params.job" , task );
339- assertNotNull (jobParam );
347+ if (expectedUpdated ) {
348+ assertNotNull (jobParam );
349+ } else {
350+ assertNull (jobParam );
351+ }
340352 } else if (id .equals (MlTasks .datafeedTaskId (datafeedId ))) {
341353 Object jobIdParam = XContentMapValues .extractValue ("task.xpack/ml/datafeed.params.job_id" , task );
342- assertNotNull (jobIdParam );
343354 Object indices = XContentMapValues .extractValue ("task.xpack/ml/datafeed.params.indices" , task );
344- assertNotNull (indices );
355+ if (expectedUpdated ) {
356+ assertNotNull (jobIdParam );
357+ assertNotNull (indices );
358+ } else {
359+ assertNull (jobIdParam );
360+ assertNull (indices );
361+ }
345362 }
346363 }
347364 }
@@ -373,7 +390,7 @@ private void assertConfigInClusterState() throws IOException {
373390 }
374391
375392 @ SuppressWarnings ("unchecked" )
376- private void waitForMigration (List < String > expectedMigratedJobs , List < String > expectedMigratedDatafeeds ) throws Exception {
393+ private void waitForMigration (String expectedMigratedJobId , String expectedMigratedDatafeedId ) throws Exception {
377394 assertBusy (() -> {
378395 // wait for the eligible configs to be moved from the clusterstate
379396 Request getClusterState = new Request ("GET" , "/_cluster/state/metadata" );
@@ -384,23 +401,55 @@ private void waitForMigration(List<String> expectedMigratedJobs, List<String> ex
384401 (List <Map <String , Object >>) XContentMapValues .extractValue ("metadata.ml.jobs" , responseMap );
385402
386403 if (jobs != null ) {
387- for (String jobId : expectedMigratedJobs ) {
388- assertJobMigrated (jobId , jobs );
389- }
404+ assertJobMigrated (expectedMigratedJobId , jobs );
390405 }
391406
392407 List <Map <String , Object >> datafeeds =
393408 (List <Map <String , Object >>) XContentMapValues .extractValue ("metadata.ml.datafeeds" , responseMap );
394409
395410 if (datafeeds != null ) {
396- for (String datafeedId : expectedMigratedDatafeeds ) {
397- assertDatafeedMigrated (datafeedId , datafeeds );
398- }
411+ assertDatafeedMigrated (expectedMigratedDatafeedId , datafeeds );
399412 }
400413
401414 }, 30 , TimeUnit .SECONDS );
402415 }
403416
417+ @ SuppressWarnings ("unchecked" )
418+ private void waitForPossibleMigration (String perhapsMigratedJobId , String perhapsMigratedDatafeedId ) throws Exception {
419+ assertBusy (() -> {
420+ Request getClusterState = new Request ("GET" , "/_cluster/state/metadata" );
421+ Response response = client ().performRequest (getClusterState );
422+ Map <String , Object > responseMap = entityAsMap (response );
423+
424+ List <Map <String , Object >> jobs =
425+ (List <Map <String , Object >>) XContentMapValues .extractValue ("metadata.ml.jobs" , responseMap );
426+
427+ boolean jobMigrated = true ;
428+ if (jobs != null ) {
429+ jobMigrated = jobs .stream ().map (map -> map .get ("job_id" ))
430+ .noneMatch (id -> id .equals (perhapsMigratedJobId ));
431+ }
432+
433+ List <Map <String , Object >> datafeeds =
434+ (List <Map <String , Object >>) XContentMapValues .extractValue ("metadata.ml.datafeeds" , responseMap );
435+
436+ boolean datafeedMigrated = true ;
437+ if (datafeeds != null ) {
438+ datafeedMigrated = datafeeds .stream ().map (map -> map .get ("datafeed_id" ))
439+ .noneMatch (id -> id .equals (perhapsMigratedDatafeedId ));
440+ }
441+
442+ if (jobMigrated ) {
443+ // if the job is migrated the datafeed should also be
444+ assertTrue (datafeedMigrated );
445+ checkJobsMarkedAsMigrated (Collections .singletonList (perhapsMigratedJobId ));
446+ }
447+
448+ // if migrated the persistent task params should have been updated
449+ checkTaskParams (perhapsMigratedJobId , perhapsMigratedDatafeedId , jobMigrated );
450+ });
451+ }
452+
404453 @ SuppressWarnings ("unchecked" )
405454 private void waitForJobToBeAssigned (String jobId ) throws Exception {
406455 assertBusy (() -> {
0 commit comments