diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 71918cadb55d8..fb914a34bfbaa 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -247,6 +247,45 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } + /** + * This unassigns a task from any node, i.e. it is assigned to a {@code null} node with the provided reason. + * + * Since the assignment executor node is null, the {@link PersistentTasksClusterService} will attempt to reassign it to a valid + * node quickly. + * + * @param taskId the id of a persistent task + * @param taskAllocationId the expected allocation id of the persistent task + * @param reason the reason for unassigning the task from any node + * @param listener the listener that will be called when task is unassigned + */ + public void unassignPersistentTask(final String taskId, + final long taskAllocationId, + final String reason, + final ActionListener> listener) { + clusterService.submitStateUpdateTask("unassign persistent task from any node", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); + if (tasksInProgress.hasTask(taskId, taskAllocationId)) { + logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId); + return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason))); + } else { + throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId); + } + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId)); + } + }); + } + /** * Creates a new {@link Assignment} for the given persistent task. * @@ -263,7 +302,7 @@ private Assignment createAssignment(final AssignmentDecision decision = decider.canAssign(); if (decision.getType() == AssignmentDecision.Type.NO) { - return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"); + return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"); } return persistentTasksExecutor.getAssignment(taskParams, currentState); @@ -404,6 +443,10 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus } } + private static Assignment unassignedAssignment(String reason) { + return new Assignment(null, reason); + } + /** * Class to periodically try to reassign unassigned persistent tasks. */ diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index ebf77d1e80360..c3863e8ec08e4 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -20,7 +20,9 @@ package org.elasticsearch.persistent; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -63,10 +65,13 @@ import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -464,6 +469,56 @@ public void testPeriodicRecheck() throws Exception { }); } + public void testUnassignTask() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder( + clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_node_1") + .masterNodeId("_node_1") + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)); + + String unassignedId = addTask(tasks, "unassign", "_node_2"); + + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + setState(clusterService, clusterState); + PersistentTasksClusterService service = createService((params, currentState) -> + new Assignment("_node_2", "test")); + service.unassignPersistentTask(unassignedId, tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap( + task -> { + assertThat(task.getAssignment().getExecutorNode(), is(nullValue())); + assertThat(task.getId(), equalTo(unassignedId)); + assertThat(task.getAssignment().getExplanation(), equalTo("unassignment test")); + }, + e -> fail() + )); + } + + public void testUnassignNonExistentTask() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder( + clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT)) + .localNodeId("_node_1") + .masterNodeId("_node_1") + .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)); + + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + setState(clusterService, clusterState); + PersistentTasksClusterService service = createService((params, currentState) -> + new Assignment("_node_2", "test")); + service.unassignPersistentTask("missing-task", tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap( + task -> fail(), + e -> assertThat(e, instanceOf(ResourceNotFoundException.class)) + )); + } + private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) { AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure); AtomicReference state = new AtomicReference<>(initialState); @@ -728,9 +783,11 @@ private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuil tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), assignment).build())); } - private void addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) { - tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), + private String addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) { + String id = UUIDs.base64UUID(); + tasks.addTask(id, TestPersistentTasksExecutor.NAME, new TestParams(param), new Assignment(node, "explanation: " + param)); + return id; } private DiscoveryNode newNode(String nodeId) { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 8f6393986da9d..08c32665adc58 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -46,8 +46,10 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.Is.is; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class PersistentTasksExecutorIT extends ESIntegTestCase { @@ -155,11 +157,8 @@ public void testPersistentActionWithNoAvailableNode() throws Exception { Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); String newNode = internalCluster().startNode(nodeSettings); String newNodeId = internalCluster().clusterService(newNode).localNode().getId(); - assertBusy(() -> { - // Wait for the task to start - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() - .size(), equalTo(1)); - }); + waitForTaskToStart(); + TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") .get().getTasks().get(0); @@ -199,11 +198,7 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception TestPersistentTasksExecutor.setNonClusterStateCondition(true); - assertBusy(() -> { - // Wait for the task to start - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() - .size(), equalTo(1)); - }); + waitForTaskToStart(); TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") .get().getTasks().get(0); @@ -221,12 +216,7 @@ public void testPersistentActionStatusUpdate() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); String taskId = future.get().getId(); - - assertBusy(() -> { - // Wait for the task to start - assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() - .size(), equalTo(1)); - }); + waitForTaskToStart(); TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") .get().getTasks().get(0); @@ -307,6 +297,62 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception { }); } + public void testUnassignRunningPersistentTask() throws Exception { + PersistentTasksClusterService persistentTasksClusterService = + internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName()); + // Speed up rechecks to a rate that is quicker than what settings would allow + persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1)); + PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); + PlainActionFuture> future = new PlainActionFuture<>(); + TestParams testParams = new TestParams("Blah"); + testParams.setExecutorNodeAttr("test"); + persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); + PersistentTask task = future.get(); + String taskId = task.getId(); + + Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); + internalCluster().startNode(nodeSettings); + + waitForTaskToStart(); + + PlainActionFuture> unassignmentFuture = new PlainActionFuture<>(); + + // Disallow re-assignment after it is unallocated to verify master and node state + TestPersistentTasksExecutor.setNonClusterStateCondition(false); + + persistentTasksClusterService.unassignPersistentTask(taskId, + task.getAllocationId() + 1, + "unassignment test", + unassignmentFuture); + PersistentTask unassignedTask = unassignmentFuture.get(); + assertThat(unassignedTask.getId(), equalTo(taskId)); + assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test")); + assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue())); + + assertBusy(() -> { + // Verify that the task is NOT running on the node + List tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get() + .getTasks(); + assertThat(tasks.size(), equalTo(0)); + + // Verify that the task is STILL in internal cluster state + assertClusterStateHasTask(taskId); + }); + + // Allow it to be reassigned again to the same node + TestPersistentTasksExecutor.setNonClusterStateCondition(true); + + // Verify it starts again + waitForTaskToStart(); + + assertClusterStateHasTask(taskId); + + // Complete or cancel the running task + TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get().getTasks().get(0); + stopOrCancelTask(taskInfo.getTaskId()); + } + private void stopOrCancelTask(TaskId taskId) { if (randomBoolean()) { logger.info("Completing the running task"); @@ -322,6 +368,25 @@ private void stopOrCancelTask(TaskId taskId) { } } + private static void waitForTaskToStart() throws Exception { + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() + .size(), equalTo(1)); + }); + } + + private static void assertClusterStateHasTask(String taskId) { + Collection> clusterTasks = ((PersistentTasksCustomMetaData) internalCluster() + .clusterService() + .state() + .getMetaData() + .custom(PersistentTasksCustomMetaData.TYPE)) + .tasks(); + assertThat(clusterTasks, hasSize(1)); + assertThat(clusterTasks.iterator().next().getId(), equalTo(taskId)); + } + private void assertNoRunningTasks() throws Exception { assertBusy(() -> { // Wait for the task to finish