4646import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertThrows ;
4747import static org .hamcrest .Matchers .empty ;
4848import static org .hamcrest .Matchers .equalTo ;
49+ import static org .hamcrest .Matchers .hasSize ;
4950import static org .hamcrest .Matchers .notNullValue ;
5051import static org .hamcrest .Matchers .nullValue ;
52+ import static org .hamcrest .core .Is .is ;
5153
5254@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .SUITE , minNumDataNodes = 2 )
5355public class PersistentTasksExecutorIT extends ESIntegTestCase {
@@ -155,11 +157,8 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
155157 Settings nodeSettings = Settings .builder ().put (nodeSettings (0 )).put ("node.attr.test_attr" , "test" ).build ();
156158 String newNode = internalCluster ().startNode (nodeSettings );
157159 String newNodeId = internalCluster ().clusterService (newNode ).localNode ().getId ();
158- assertBusy (() -> {
159- // Wait for the task to start
160- assertThat (client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" ).get ().getTasks ()
161- .size (), equalTo (1 ));
162- });
160+ waitForTaskToStart ();
161+
163162 TaskInfo taskInfo = client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" )
164163 .get ().getTasks ().get (0 );
165164
@@ -199,11 +198,7 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception
199198
200199 TestPersistentTasksExecutor .setNonClusterStateCondition (true );
201200
202- assertBusy (() -> {
203- // Wait for the task to start
204- assertThat (client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" ).get ().getTasks ()
205- .size (), equalTo (1 ));
206- });
201+ waitForTaskToStart ();
207202 TaskInfo taskInfo = client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" )
208203 .get ().getTasks ().get (0 );
209204
@@ -221,12 +216,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
221216 PlainActionFuture <PersistentTask <TestParams >> future = new PlainActionFuture <>();
222217 persistentTasksService .sendStartRequest (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
223218 String taskId = future .get ().getId ();
224-
225- assertBusy (() -> {
226- // Wait for the task to start
227- assertThat (client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" ).get ().getTasks ()
228- .size (), equalTo (1 ));
229- });
219+ waitForTaskToStart ();
230220 TaskInfo firstRunningTask = client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" )
231221 .get ().getTasks ().get (0 );
232222
@@ -307,6 +297,62 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception {
307297 });
308298 }
309299
300+ public void testUnassignRunningPersistentTask () throws Exception {
301+ PersistentTasksClusterService persistentTasksClusterService =
302+ internalCluster ().getInstance (PersistentTasksClusterService .class , internalCluster ().getMasterName ());
303+ // Speed up rechecks to a rate that is quicker than what settings would allow
304+ persistentTasksClusterService .setRecheckInterval (TimeValue .timeValueMillis (1 ));
305+ PersistentTasksService persistentTasksService = internalCluster ().getInstance (PersistentTasksService .class );
306+ PlainActionFuture <PersistentTask <TestParams >> future = new PlainActionFuture <>();
307+ TestParams testParams = new TestParams ("Blah" );
308+ testParams .setExecutorNodeAttr ("test" );
309+ persistentTasksService .sendStartRequest (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , testParams , future );
310+ PersistentTask <TestParams > task = future .get ();
311+ String taskId = task .getId ();
312+
313+ Settings nodeSettings = Settings .builder ().put (nodeSettings (0 )).put ("node.attr.test_attr" , "test" ).build ();
314+ internalCluster ().startNode (nodeSettings );
315+
316+ waitForTaskToStart ();
317+
318+ PlainActionFuture <PersistentTask <?>> unassignmentFuture = new PlainActionFuture <>();
319+
320+ // Disallow re-assignment after it is unallocated to verify master and node state
321+ TestPersistentTasksExecutor .setNonClusterStateCondition (false );
322+
323+ persistentTasksClusterService .unassignPersistentTask (taskId ,
324+ task .getAllocationId () + 1 ,
325+ "unassignment test" ,
326+ unassignmentFuture );
327+ PersistentTask <?> unassignedTask = unassignmentFuture .get ();
328+ assertThat (unassignedTask .getId (), equalTo (taskId ));
329+ assertThat (unassignedTask .getAssignment ().getExplanation (), equalTo ("unassignment test" ));
330+ assertThat (unassignedTask .getAssignment ().getExecutorNode (), is (nullValue ()));
331+
332+ assertBusy (() -> {
333+ // Verify that the task is NOT running on the node
334+ List <TaskInfo > tasks = client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" ).get ()
335+ .getTasks ();
336+ assertThat (tasks .size (), equalTo (0 ));
337+
338+ // Verify that the task is STILL in internal cluster state
339+ assertClusterStateHasTask (taskId );
340+ });
341+
342+ // Allow it to be reassigned again to the same node
343+ TestPersistentTasksExecutor .setNonClusterStateCondition (true );
344+
345+ // Verify it starts again
346+ waitForTaskToStart ();
347+
348+ assertClusterStateHasTask (taskId );
349+
350+ // Complete or cancel the running task
351+ TaskInfo taskInfo = client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" )
352+ .get ().getTasks ().get (0 );
353+ stopOrCancelTask (taskInfo .getTaskId ());
354+ }
355+
310356 private void stopOrCancelTask (TaskId taskId ) {
311357 if (randomBoolean ()) {
312358 logger .info ("Completing the running task" );
@@ -322,6 +368,25 @@ private void stopOrCancelTask(TaskId taskId) {
322368 }
323369 }
324370
371+ private static void waitForTaskToStart () throws Exception {
372+ assertBusy (() -> {
373+ // Wait for the task to start
374+ assertThat (client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" ).get ().getTasks ()
375+ .size (), equalTo (1 ));
376+ });
377+ }
378+
379+ private static void assertClusterStateHasTask (String taskId ) {
380+ Collection <PersistentTask <?>> clusterTasks = ((PersistentTasksCustomMetaData ) internalCluster ()
381+ .clusterService ()
382+ .state ()
383+ .getMetaData ()
384+ .custom (PersistentTasksCustomMetaData .TYPE ))
385+ .tasks ();
386+ assertThat (clusterTasks , hasSize (1 ));
387+ assertThat (clusterTasks .iterator ().next ().getId (), equalTo (taskId ));
388+ }
389+
325390 private void assertNoRunningTasks () throws Exception {
326391 assertBusy (() -> {
327392 // Wait for the task to finish
0 commit comments