Skip to content

Commit 8202bf2

Browse files
committed
Don't wait for completion of list tasks tasks when wait_for_completion flag is set
Waiting for completion of list tasks tasks can cause an infinite loop of a list tasks task waiting for its own completion or completion of its children. To reproduce run: ``` curl "localhost:9200/_tasks?wait_for_completion" ```
1 parent 201fc06 commit 8202bf2

File tree

3 files changed

+23
-1
lines changed

3 files changed

+23
-1
lines changed

core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@ protected void processTasks(ListTasksRequest request, Consumer<Task> operation)
8484
long timeoutTime = System.nanoTime() + timeout.nanos();
8585
super.processTasks(request, operation.andThen((Task t) -> {
8686
while (System.nanoTime() - timeoutTime < 0) {
87-
if (taskManager.getTask(t.getId()) == null) {
87+
Task task = taskManager.getTask(t.getId());
88+
if (task == null) {
89+
return;
90+
}
91+
if (task.getAction().startsWith(ListTasksAction.NAME)) {
92+
// It doesn't make sense to wait for List Tasks and it can cause an infinite loop of the task waiting
93+
// for itself of one of its child tasks
8894
return;
8995
}
9096
try {

core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/tasks/RestListTasksAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.Strings;
2525
import org.elasticsearch.common.inject.Inject;
2626
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.common.unit.TimeValue;
2728
import org.elasticsearch.rest.BaseRestHandler;
2829
import org.elasticsearch.rest.RestChannel;
2930
import org.elasticsearch.rest.RestController;
@@ -51,6 +52,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
5152
String[] actions = Strings.splitStringByCommaToArray(request.param("actions"));
5253
TaskId parentTaskId = new TaskId(request.param("parent_task_id"));
5354
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", false);
55+
TimeValue timeout = request.paramAsTime("timeout", null);
5456

5557
ListTasksRequest listTasksRequest = new ListTasksRequest();
5658
listTasksRequest.setTaskId(taskId);
@@ -59,6 +61,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
5961
listTasksRequest.setActions(actions);
6062
listTasksRequest.setParentTaskId(parentTaskId);
6163
listTasksRequest.setWaitForCompletion(waitForCompletion);
64+
listTasksRequest.setTimeout(timeout);
6265
client.admin().cluster().listTasks(listTasksRequest, new RestToXContentListener<>(channel));
6366
}
6467
}

core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.ElasticsearchTimeoutException;
2222
import org.elasticsearch.action.FailedNodeException;
2323
import org.elasticsearch.action.ListenableActionFuture;
24+
import org.elasticsearch.action.TaskOperationFailure;
2425
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
2526
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
2627
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
@@ -59,6 +60,7 @@
5960
import java.util.function.Function;
6061

6162
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
63+
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
6264
import static org.hamcrest.Matchers.either;
6365
import static org.hamcrest.Matchers.emptyCollectionOf;
6466
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -406,6 +408,17 @@ public void testTasksListWaitForNoTask() throws Exception {
406408
assertThat(waitResponseFuture.get().getTasks(), emptyCollectionOf(TaskInfo.class));
407409
}
408410

411+
public void testTasksWaitForAllTask() throws Exception {
412+
// Spin up a request to wait for all tasks in the cluster to make sure it doesn't cause an infinite loop
413+
ListTasksResponse response = client().admin().cluster().prepareListTasks().setWaitForCompletion(true)
414+
.setTimeout(timeValueSeconds(10)).get();
415+
416+
// It should finish quickly and without complaint and list the list tasks themselves
417+
assertThat(response.getNodeFailures(), emptyCollectionOf(FailedNodeException.class));
418+
assertThat(response.getTaskFailures(), emptyCollectionOf(TaskOperationFailure.class));
419+
assertThat(response.getTasks().size(), greaterThanOrEqualTo(1));
420+
}
421+
409422
@Override
410423
public void tearDown() throws Exception {
411424
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {

0 commit comments

Comments
 (0)