Skip to content

Commit 021fbeb

Browse files
authored
Make GetIndexAction cancellable (#87731)
The get-indices API does some nontrivial work on the master and at high index counts the response may be very large and could take a long time to compute. Some clients will time out and retry if it takes too long. Today this API is not properly cancellable which leads to a good deal of wasted work in this situation, and the potentially-enormous response is serialized on a transport worker thread. With this commit we make the API cancellable and move the serialization to a `MANAGEMENT` thread. Backport of #87681 Relates #77466
1 parent 76d0934 commit 021fbeb

File tree

8 files changed

+63
-15
lines changed

8 files changed

+63
-15
lines changed

docs/changelog/87681.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 87681
2+
summary: Make `GetIndexAction` cancellable
3+
area: Indices APIs
4+
type: bug
5+
issues: []
Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.http;
1010

1111
import org.apache.http.client.methods.HttpGet;
12+
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
1213
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
1314
import org.elasticsearch.action.support.PlainActionFuture;
1415
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -37,21 +38,28 @@
3738
import static org.hamcrest.core.IsEqual.equalTo;
3839

3940
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
40-
public class RestGetMappingsCancellationIT extends HttpSmokeTestCase {
41+
public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase {
4142

4243
public void testGetMappingsCancellation() throws Exception {
44+
runTest(GetMappingsAction.NAME, "/test/_mappings");
45+
}
46+
47+
public void testGetIndicesCancellation() throws Exception {
48+
runTest(GetIndexAction.NAME, "/test");
49+
}
50+
51+
private void runTest(String actionName, String endpoint) throws Exception {
4352
internalCluster().startMasterOnlyNode();
4453
internalCluster().startDataOnlyNode();
4554
ensureStableCluster(2);
4655

4756
createIndex("test");
4857
ensureGreen("test");
49-
final String actionName = GetMappingsAction.NAME;
5058
// Add a retryable cluster block that would block the request execution
5159
updateClusterState(currentState -> {
5260
ClusterBlock clusterBlock = new ClusterBlock(
5361
1000,
54-
"Get mappings cancellation test cluster block",
62+
actionName + " cancellation test cluster block",
5563
true,
5664
false,
5765
false,
@@ -62,7 +70,7 @@ public void testGetMappingsCancellation() throws Exception {
6270
return ClusterState.builder(currentState).blocks(ClusterBlocks.builder().addGlobalBlock(clusterBlock).build()).build();
6371
});
6472

65-
final Request request = new Request(HttpGet.METHOD_NAME, "/test/_mappings");
73+
final Request request = new Request(HttpGet.METHOD_NAME, endpoint);
6674
final PlainActionFuture<Response> future = new PlainActionFuture<>();
6775
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));
6876

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
738738
registerHandler.accept(new RestResetFeatureStateAction());
739739
registerHandler.accept(new RestGetFeatureUpgradeStatusAction());
740740
registerHandler.accept(new RestPostFeatureUpgradeAction());
741-
registerHandler.accept(new RestGetIndicesAction());
741+
registerHandler.accept(new RestGetIndicesAction(threadPool));
742742
registerHandler.accept(new RestIndicesStatsAction());
743743
registerHandler.accept(new RestIndicesSegmentsAction(threadPool));
744744
registerHandler.accept(new RestIndicesShardStoresAction());

server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.util.ArrayUtils;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
1720

1821
import java.io.IOException;
22+
import java.util.Map;
1923

2024
/**
2125
* A request to retrieve information about an index.
@@ -137,4 +141,8 @@ public void writeTo(StreamOutput out) throws IOException {
137141
}
138142
}
139143

144+
@Override
145+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
146+
return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
147+
}
140148
}

server/src/main/java/org/elasticsearch/action/admin/indices/get/TransportGetIndexAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
import org.elasticsearch.cluster.metadata.IndexMetadata;
1818
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1919
import org.elasticsearch.cluster.metadata.MappingMetadata;
20-
import org.elasticsearch.cluster.metadata.Metadata;
2120
import org.elasticsearch.cluster.service.ClusterService;
2221
import org.elasticsearch.common.collect.ImmutableOpenMap;
2322
import org.elasticsearch.common.inject.Inject;
2423
import org.elasticsearch.common.settings.IndexScopedSettings;
2524
import org.elasticsearch.common.settings.Settings;
2625
import org.elasticsearch.common.settings.SettingsFilter;
2726
import org.elasticsearch.indices.IndicesService;
27+
import org.elasticsearch.tasks.CancellableTask;
2828
import org.elasticsearch.tasks.Task;
2929
import org.elasticsearch.threadpool.ThreadPool;
3030
import org.elasticsearch.transport.TransportService;
@@ -94,6 +94,7 @@ protected void doMasterOperation(
9494
boolean doneMappings = false;
9595
boolean doneSettings = false;
9696
for (Feature feature : features) {
97+
checkCancellation(task);
9798
switch (feature) {
9899
case MAPPINGS:
99100
if (doneMappings == false) {
@@ -103,7 +104,7 @@ protected void doMasterOperation(
103104
concreteIndices,
104105
request.types(),
105106
indicesService.getFieldFilter(),
106-
Metadata.ON_NEXT_INDEX_FIND_MAPPINGS_NOOP
107+
() -> checkCancellation(task)
107108
);
108109
doneMappings = true;
109110
} catch (IOException e) {
@@ -123,6 +124,7 @@ protected void doMasterOperation(
123124
ImmutableOpenMap.Builder<String, Settings> settingsMapBuilder = ImmutableOpenMap.builder();
124125
ImmutableOpenMap.Builder<String, Settings> defaultSettingsMapBuilder = ImmutableOpenMap.builder();
125126
for (String index : concreteIndices) {
127+
checkCancellation(task);
126128
Settings indexSettings = state.metadata().index(index).getSettings();
127129
if (request.humanReadable()) {
128130
indexSettings = IndexMetadata.addHumanReadableSettings(indexSettings);
@@ -147,4 +149,10 @@ protected void doMasterOperation(
147149
}
148150
listener.onResponse(new GetIndexResponse(concreteIndices, mappingsResult, aliasesResult, settings, defaultSettings, dataStreams));
149151
}
152+
153+
private static void checkCancellation(Task task) {
154+
if (task instanceof CancellableTask) {
155+
((CancellableTask) task).ensureNotCancelled();
156+
}
157+
}
150158
}

server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.transport.TransportService;
2727

2828
import java.io.IOException;
29-
import java.util.concurrent.CancellationException;
3029

3130
public class TransportGetMappingsAction extends TransportClusterInfoAction<GetMappingsRequest, GetMappingsResponse> {
3231

@@ -74,9 +73,9 @@ protected void doMasterOperation(
7473
}
7574
}
7675

77-
private void checkCancellation(Task task) {
78-
if (task instanceof CancellableTask && ((CancellableTask) task).isCancelled()) {
79-
throw new CancellationException("Task cancelled");
76+
private static void checkCancellation(Task task) {
77+
if (task instanceof CancellableTask) {
78+
((CancellableTask) task).ensureNotCancelled();
8079
}
8180
}
8281
}

server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@
1515
import org.elasticsearch.common.logging.DeprecationCategory;
1616
import org.elasticsearch.common.logging.DeprecationLogger;
1717
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.xcontent.StatusToXContentObject;
19+
import org.elasticsearch.http.HttpChannel;
1820
import org.elasticsearch.rest.BaseRestHandler;
1921
import org.elasticsearch.rest.RestRequest;
20-
import org.elasticsearch.rest.action.RestToXContentListener;
22+
import org.elasticsearch.rest.RestStatus;
23+
import org.elasticsearch.rest.action.DispatchingRestToXContentListener;
24+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
25+
import org.elasticsearch.threadpool.ThreadPool;
2126

2227
import java.io.IOException;
2328
import java.util.Collections;
@@ -45,6 +50,12 @@ public class RestGetIndicesAction extends BaseRestHandler {
4550
.collect(Collectors.toSet())
4651
);
4752

53+
private final ThreadPool threadPool;
54+
55+
public RestGetIndicesAction(ThreadPool threadPool) {
56+
this.threadPool = threadPool;
57+
}
58+
4859
@Override
4960
public List<Route> routes() {
5061
return unmodifiableList(asList(new Route(GET, "/{index}"), new Route(HEAD, "/{index}")));
@@ -69,7 +80,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
6980
getIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexRequest.masterNodeTimeout()));
7081
getIndexRequest.humanReadable(request.paramAsBoolean("human", false));
7182
getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
72-
return channel -> client.admin().indices().getIndex(getIndexRequest, new RestToXContentListener<>(channel));
83+
final HttpChannel httpChannel = request.getHttpChannel();
84+
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
85+
.indices()
86+
.getIndex(
87+
getIndexRequest,
88+
new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request).map(
89+
r -> StatusToXContentObject.withStatus(RestStatus.OK, r)
90+
)
91+
);
7392
}
7493

7594
/**

server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesActionTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.rest.action.admin.indices;
1010

1111
import org.elasticsearch.client.node.NodeClient;
12+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1213
import org.elasticsearch.rest.RestRequest;
1314
import org.elasticsearch.test.rest.FakeRestRequest;
1415
import org.elasticsearch.test.rest.RestActionTestCase;
@@ -33,7 +34,7 @@ public void testIncludeTypeNamesWarning() throws IOException {
3334
.withParams(params)
3435
.build();
3536

36-
RestGetIndicesAction handler = new RestGetIndicesAction();
37+
RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool());
3738
handler.prepareRequest(request, mock(NodeClient.class));
3839
assertWarnings(RestGetIndicesAction.TYPES_DEPRECATION_MESSAGE);
3940

@@ -53,7 +54,7 @@ public void testIncludeTypeNamesWarningExists() throws IOException {
5354
.withParams(params)
5455
.build();
5556

56-
RestGetIndicesAction handler = new RestGetIndicesAction();
57+
RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool());
5758
handler.prepareRequest(request, mock(NodeClient.class));
5859
}
5960
}

0 commit comments

Comments
 (0)