@@ -222,7 +222,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception {
222222 assertThat (datafeedManager .isRunning (task .getAllocationId ()), is (true ));
223223 }
224224
225- public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime () throws Exception {
225+ public void testStart_GivenNewlyCreatedJobLookBackAndRealtime () throws Exception {
226226 when (datafeedJob .runLookBack (anyLong (), anyLong ())).thenReturn (1L );
227227 when (datafeedJob .runRealtime ()).thenReturn (1L );
228228
@@ -282,8 +282,45 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
282282 verify (threadPool , times (1 )).executor (MachineLearning .DATAFEED_THREAD_POOL_NAME );
283283 }
284284
285+ public void testDatafeedTaskWaitsUntilJobIsNotStale () {
286+ PersistentTasksCustomMetaData .Builder tasksBuilder = PersistentTasksCustomMetaData .builder ();
287+ addJobTask ("job_id" , "node_id" , JobState .OPENED , tasksBuilder , true );
288+ ClusterState .Builder cs = ClusterState .builder (clusterService .state ())
289+ .metaData (new MetaData .Builder ().putCustom (PersistentTasksCustomMetaData .TYPE , tasksBuilder .build ()));
290+ when (clusterService .state ()).thenReturn (cs .build ());
291+
292+ Consumer <Exception > handler = mockConsumer ();
293+ DatafeedTask task = createDatafeedTask ("datafeed_id" , 0L , 60000L );
294+ datafeedManager .run (task , handler );
295+
296+ // Verify datafeed has not started running yet as job is stale (i.e. even though opened it is part way through relocating)
297+ verify (threadPool , never ()).executor (MachineLearning .DATAFEED_THREAD_POOL_NAME );
298+
299+ tasksBuilder = PersistentTasksCustomMetaData .builder ();
300+ addJobTask ("job_id" , "node_id" , JobState .OPENED , tasksBuilder , true );
301+ addJobTask ("another_job" , "node_id" , JobState .OPENED , tasksBuilder );
302+ ClusterState .Builder anotherJobCs = ClusterState .builder (clusterService .state ())
303+ .metaData (new MetaData .Builder ().putCustom (PersistentTasksCustomMetaData .TYPE , tasksBuilder .build ()));
304+
305+ capturedClusterStateListener .getValue ().clusterChanged (new ClusterChangedEvent ("_source" , anotherJobCs .build (), cs .build ()));
306+
307+ // Still no run
308+ verify (threadPool , never ()).executor (MachineLearning .DATAFEED_THREAD_POOL_NAME );
309+
310+ tasksBuilder = PersistentTasksCustomMetaData .builder ();
311+ addJobTask ("job_id" , "node_id" , JobState .OPENED , tasksBuilder );
312+ ClusterState .Builder jobOpenedCs = ClusterState .builder (clusterService .state ())
313+ .metaData (new MetaData .Builder ().putCustom (PersistentTasksCustomMetaData .TYPE , tasksBuilder .build ()));
314+
315+ capturedClusterStateListener .getValue ().clusterChanged (
316+ new ClusterChangedEvent ("_source" , jobOpenedCs .build (), anotherJobCs .build ()));
317+
318+ // Now it should run as the job state chanded to OPENED
319+ verify (threadPool , times (1 )).executor (MachineLearning .DATAFEED_THREAD_POOL_NAME );
320+ }
321+
285322 public void testDatafeedTaskStopsBecauseJobFailedWhileOpening () {
286- PersistentTasksCustomMetaData .Builder tasksBuilder = PersistentTasksCustomMetaData .builder ();
323+ PersistentTasksCustomMetaData .Builder tasksBuilder = PersistentTasksCustomMetaData .builder ();
287324 addJobTask ("job_id" , "node_id" , JobState .OPENING , tasksBuilder );
288325 ClusterState .Builder cs = ClusterState .builder (clusterService .state ())
289326 .metaData (new MetaData .Builder ().putCustom (PersistentTasksCustomMetaData .TYPE , tasksBuilder .build ()));
@@ -296,7 +333,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
296333 // Verify datafeed has not started running yet as job is still opening
297334 verify (threadPool , never ()).executor (MachineLearning .DATAFEED_THREAD_POOL_NAME );
298335
299- tasksBuilder = PersistentTasksCustomMetaData .builder ();
336+ tasksBuilder = PersistentTasksCustomMetaData .builder ();
300337 addJobTask ("job_id" , "node_id" , JobState .FAILED , tasksBuilder );
301338 ClusterState .Builder updatedCs = ClusterState .builder (clusterService .state ())
302339 .metaData (new MetaData .Builder ().putCustom (PersistentTasksCustomMetaData .TYPE , tasksBuilder .build ()));
@@ -309,7 +346,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
309346 }
310347
311348 public void testDatafeedGetsStoppedWhileWaitingForJobToOpen () {
312- PersistentTasksCustomMetaData .Builder tasksBuilder = PersistentTasksCustomMetaData .builder ();
349+ PersistentTasksCustomMetaData .Builder tasksBuilder = PersistentTasksCustomMetaData .builder ();
313350 addJobTask ("job_id" , "node_id" , JobState .OPENING , tasksBuilder );
314351 ClusterState .Builder cs = ClusterState .builder (clusterService .state ())
315352 .metaData (new MetaData .Builder ().putCustom (PersistentTasksCustomMetaData .TYPE , tasksBuilder .build ()));
@@ -326,7 +363,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() {
326363 datafeedManager .stopDatafeed (task , "test" , StopDatafeedAction .DEFAULT_TIMEOUT );
327364
328365 // Update job state to opened
329- tasksBuilder = PersistentTasksCustomMetaData .builder ();
366+ tasksBuilder = PersistentTasksCustomMetaData .builder ();
330367 addJobTask ("job_id" , "node_id" , JobState .OPENED , tasksBuilder );
331368 ClusterState .Builder updatedCs = ClusterState .builder (clusterService .state ())
332369 .metaData (new MetaData .Builder ().putCustom (PersistentTasksCustomMetaData .TYPE , tasksBuilder .build ()));
0 commit comments