Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/reference/cluster/tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,71 @@ The following command will change the grouping to parent tasks:
GET _tasks?group_by=parents
--------------------------------------------------
// CONSOLE

The grouping can be disabled by specifying `none` as a `group_by` parameter:

[source,js]
--------------------------------------------------
GET _tasks?group_by=none
--------------------------------------------------
// CONSOLE

[float]
=== Identifying running tasks

The `X-Opaque-Id` header, when provided on the HTTP request header, is going to be returned as a header in the response as well as
in the `headers` field for in the task information. This allows to track certain calls, or associate certain tasks with
a the client that started them:

[source,sh]
--------------------------------------------------
curl -i -H "X-Opaque-Id: 123456" "http://localhost:9200/_tasks?group_by=parents"
--------------------------------------------------
// NOTCONSOLE

The result will look similar to the following:

[source,js]
--------------------------------------------------
HTTP/1.1 200 OK
X-Opaque-Id: 123456 <1>
content-type: application/json; charset=UTF-8
content-length: 831

{
"tasks" : {
"u5lcZHqcQhu-rUoFaqDphA:45" : {
"node" : "u5lcZHqcQhu-rUoFaqDphA",
"id" : 45,
"type" : "transport",
"action" : "cluster:monitor/tasks/lists",
"start_time_in_millis" : 1513823752749,
"running_time_in_nanos" : 293139,
"cancellable" : false,
"headers" : {
"X-Opaque-Id" : "123456" <2>
},
"children" : [
{
"node" : "u5lcZHqcQhu-rUoFaqDphA",
"id" : 46,
"type" : "direct",
"action" : "cluster:monitor/tasks/lists[n]",
"start_time_in_millis" : 1513823752750,
"running_time_in_nanos" : 92133,
"cancellable" : false,
"parent_task_id" : "u5lcZHqcQhu-rUoFaqDphA:45",
"headers" : {
"X-Opaque-Id" : "123456" <3>
}
}
]
}
}
}
--------------------------------------------------
// NOTCONSOLE

<1> id as a part of the response header
<2> id for the tasks that was initiated by the REST request
<3> the child task of the task initiated by the REST request
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private SearchRequest firstSearchRequest;
private PlainActionFuture<BulkByScrollResponse> listener;
private String scrollId;
private ThreadPool threadPool;
private TaskManager taskManager;
private BulkByScrollTask testTask;
private WorkerBulkByScrollTaskState worker;
Expand All @@ -141,7 +143,8 @@ public void setupForTest() {
testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
listener = new PlainActionFuture<>();
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY);
threadPool = new TestThreadPool(getClass().getName());
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
testTask.setWorker(testRequest.getRequestsPerSecond(), null);
worker = testTask.getWorkerState();
Expand All @@ -159,8 +162,9 @@ private void setupClient(ThreadPool threadPool) {
}

@After
public void tearDownAndVerifyCommonStuff() {
public void tearDownAndVerifyCommonStuff() throws Exception {
client.close();
terminate(threadPool);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -53,7 +54,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
@Before
public void createTask() {
slices = between(2, 50);
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID);
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
task.setWorkerCount(slices);
}

Expand Down Expand Up @@ -101,7 +102,8 @@ public void testRethrottleSuccessfulResponse() {
List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
for (int i = 0; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices,
Expand All @@ -121,7 +123,8 @@ public void testRethrottleWithSomeSucceeded() {
List<TaskInfo> tasks = new ArrayList<>();
for (int i = succeeded; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices - succeeded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,44 @@
field3: value
- match: { hits.total: 1 }
- match: { hits.hits.0._id: q3 }

---
"Create a task result record in the old cluster":
- do:
indices.create:
index: reindexed_index
body:
settings:
index:
number_of_replicas: 0
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "1"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "2"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "3"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "4"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "5"}'

- do:
reindex:
wait_for_completion: false
body:
source:
index: reindexed_index
size: 1
dest:
index: reindexed_index_copy
- match: {task: '/.+:\d+/'}
- set: {task: task}

- do:
tasks.get:
wait_for_completion: true
task_id: $task
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,42 @@
field3: value
- match: { hits.total: 1 }
- match: { hits.hits.0._id: q3 }

---
"Find a task result record from the old cluster":
- do:
search:
index: .tasks
body:
query:
match_all: {}
- match: { hits.total: 1 }
- match: { hits.hits.0._id: '/.+:\d+/' }
- set: {hits.hits.0._id: task_id}

- do:
tasks.get:
wait_for_completion: true
task_id: $task_id

- is_false: node_failures
- is_true: task

- do:
headers: { "X-Opaque-Id": "Reindexing Again" }
reindex:
wait_for_completion: false
body:
source:
index: reindexed_index_copy
size: 1
dest:
index: reindexed_index_another_copy
- match: { task: '/.+:\d+/' }
- set: { task: task_id }

- do:
tasks.get:
wait_for_completion: true
task_id: $task_id
- match: { task.headers.X-Opaque-Id: "Reindexing Again" }
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"group_by": {
"type" : "enum",
"description": "Group tasks by nodes or parent/child relationships",
"options" : ["nodes", "parents"],
"options" : ["nodes", "parents", "none"],
"default" : "nodes"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,19 @@
group_by: parents

- is_true: tasks

---
"tasks_list headers":
- skip:
version: " - 6.99.99"
reason: task headers has been added in 7.0.0

- do:
headers: { "X-Opaque-Id": "That is me" }
tasks.list:
actions: "cluster:monitor/tasks/lists"
group_by: none

- is_true: tasks
- match: { tasks.0.headers.X-Opaque-Id: "That is me" }

Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;

Expand All @@ -324,6 +325,7 @@
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;

Expand Down Expand Up @@ -362,7 +364,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of("X-Opaque-Id")
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* Transport action that can be used to cancel currently running cancellable tasks.
* <p>
* For a task to be cancellable it has to return an instance of
* {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String, TaskId)}
* {@link CancellableTask} from {@link TransportRequest#createTask}
*/
public class TransportCancelTasksAction extends TransportTasksAction<CancellableTask, CancelTasksRequest, CancelTasksResponse, TaskInfo> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Param
return builder;
}

/**
* Presents a flat list of tasks
*/
public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startArray("tasks");
for (TaskInfo taskInfo : getTasks()) {
builder.startObject();
taskInfo.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
/**
* Max length of the source document to include into toString()
*
* @see ReplicationRequest#createTask(long, java.lang.String, java.lang.String, org.elasticsearch.tasks.TaskId)
* @see ReplicationRequest#createTask
*/
static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -428,9 +429,9 @@ public boolean isSuggestOnly() {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId) {
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
public String getDescription() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -117,8 +118,8 @@ public void readFrom(StreamInput in) throws IOException {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
Expand Down
Loading