From 90e02948c49a8eb29728c6fe2b826614d1476321 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 16 Jun 2022 10:28:51 +0100 Subject: [PATCH] Make GetIndexAction cancellable The get-indices API does some nontrivial work on the master and at high index counts the response may be very large and could take a long time to compute. Some clients will time out and retry if it takes too long. Today this API is not properly cancellable which leads to a good deal of wasted work in this situation, and the potentially-enormous response is serialized on a transport worker thread. With this commit we make the API cancellable and move the serialization to a `MANAGEMENT` thread. Backport of #87681 Relates #77466 --- docs/changelog/87681.yaml | 5 ++++ ... RestClusterInfoActionCancellationIT.java} | 16 +++++++++---- .../elasticsearch/action/ActionModule.java | 2 +- .../admin/indices/get/GetIndexRequest.java | 8 +++++++ .../indices/get/TransportGetIndexAction.java | 12 ++++++++-- .../get/TransportGetMappingsAction.java | 7 +++--- .../admin/indices/RestGetIndicesAction.java | 23 +++++++++++++++++-- .../indices/RestGetIndicesActionTests.java | 5 ++-- 8 files changed, 63 insertions(+), 15 deletions(-) create mode 100644 docs/changelog/87681.yaml rename qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/{RestGetMappingsCancellationIT.java => RestClusterInfoActionCancellationIT.java} (90%) diff --git a/docs/changelog/87681.yaml b/docs/changelog/87681.yaml new file mode 100644 index 0000000000000..ab6bbe19da6b6 --- /dev/null +++ b/docs/changelog/87681.yaml @@ -0,0 +1,5 @@ +pr: 87681 +summary: Make `GetIndexAction` cancellable +area: Indices APIs +type: bug +issues: [] diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestGetMappingsCancellationIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java similarity index 90% rename from qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestGetMappingsCancellationIT.java rename to qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java index 380e1f498a79c..060155ce39347 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestGetMappingsCancellationIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java @@ -9,6 +9,7 @@ package org.elasticsearch.http; import org.apache.http.client.methods.HttpGet; +import org.elasticsearch.action.admin.indices.get.GetIndexAction; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -37,21 +38,28 @@ import static org.hamcrest.core.IsEqual.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) -public class RestGetMappingsCancellationIT extends HttpSmokeTestCase { +public class RestClusterInfoActionCancellationIT extends HttpSmokeTestCase { public void testGetMappingsCancellation() throws Exception { + runTest(GetMappingsAction.NAME, "/test/_mappings"); + } + + public void testGetIndicesCancellation() throws Exception { + runTest(GetIndexAction.NAME, "/test"); + } + + private void runTest(String actionName, String endpoint) throws Exception { internalCluster().startMasterOnlyNode(); internalCluster().startDataOnlyNode(); ensureStableCluster(2); createIndex("test"); ensureGreen("test"); - final String actionName = GetMappingsAction.NAME; // Add a retryable cluster block that would block the request execution updateClusterState(currentState -> { ClusterBlock clusterBlock = new ClusterBlock( 1000, - "Get mappings cancellation test cluster block", + actionName + " cancellation test cluster block", true, false, false, @@ -62,7 +70,7 @@ public void testGetMappingsCancellation() throws Exception { return ClusterState.builder(currentState).blocks(ClusterBlocks.builder().addGlobalBlock(clusterBlock).build()).build(); }); - final Request request = new Request(HttpGet.METHOD_NAME, "/test/_mappings"); + final Request request = new Request(HttpGet.METHOD_NAME, endpoint); final PlainActionFuture future = new PlainActionFuture<>(); final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future)); diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index fefbe2928642b..99deaf55ab758 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -738,7 +738,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestResetFeatureStateAction()); registerHandler.accept(new RestGetFeatureUpgradeStatusAction()); registerHandler.accept(new RestPostFeatureUpgradeAction()); - registerHandler.accept(new RestGetIndicesAction()); + registerHandler.accept(new RestGetIndicesAction(threadPool)); registerHandler.accept(new RestIndicesStatsAction()); registerHandler.accept(new RestIndicesSegmentsAction(threadPool)); registerHandler.accept(new RestIndicesShardStoresAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexRequest.java index abed6cc572afa..d6aed5e3ba8ec 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/get/GetIndexRequest.java @@ -14,8 +14,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.ArrayUtils; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; /** * A request to retrieve information about an index. @@ -137,4 +141,8 @@ public void writeTo(StreamOutput out) throws IOException { } } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/get/TransportGetIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/get/TransportGetIndexAction.java index c703a4b4a1b05..63e8b33016dfe 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/get/TransportGetIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/get/TransportGetIndexAction.java @@ -17,7 +17,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetadata; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; @@ -25,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -94,6 +94,7 @@ protected void doMasterOperation( boolean doneMappings = false; boolean doneSettings = false; for (Feature feature : features) { + checkCancellation(task); switch (feature) { case MAPPINGS: if (doneMappings == false) { @@ -103,7 +104,7 @@ protected void doMasterOperation( concreteIndices, request.types(), indicesService.getFieldFilter(), - Metadata.ON_NEXT_INDEX_FIND_MAPPINGS_NOOP + () -> checkCancellation(task) ); doneMappings = true; } catch (IOException e) { @@ -123,6 +124,7 @@ protected void doMasterOperation( ImmutableOpenMap.Builder settingsMapBuilder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder defaultSettingsMapBuilder = ImmutableOpenMap.builder(); for (String index : concreteIndices) { + checkCancellation(task); Settings indexSettings = state.metadata().index(index).getSettings(); if (request.humanReadable()) { indexSettings = IndexMetadata.addHumanReadableSettings(indexSettings); @@ -147,4 +149,10 @@ protected void doMasterOperation( } listener.onResponse(new GetIndexResponse(concreteIndices, mappingsResult, aliasesResult, settings, defaultSettings, dataStreams)); } + + private static void checkCancellation(Task task) { + if (task instanceof CancellableTask) { + ((CancellableTask) task).ensureNotCancelled(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java index 5d39e84dad799..ff58a023a5612 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetMappingsAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.concurrent.CancellationException; public class TransportGetMappingsAction extends TransportClusterInfoAction { @@ -74,9 +73,9 @@ protected void doMasterOperation( } } - private void checkCancellation(Task task) { - if (task instanceof CancellableTask && ((CancellableTask) task).isCancelled()) { - throw new CancellationException("Task cancelled"); + private static void checkCancellation(Task task) { + if (task instanceof CancellableTask) { + ((CancellableTask) task).ensureNotCancelled(); } } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java index 5a5207dc968a5..03799a57a2b72 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesAction.java @@ -15,9 +15,14 @@ import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.http.HttpChannel; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.DispatchingRestToXContentListener; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; @@ -45,6 +50,12 @@ public class RestGetIndicesAction extends BaseRestHandler { .collect(Collectors.toSet()) ); + private final ThreadPool threadPool; + + public RestGetIndicesAction(ThreadPool threadPool) { + this.threadPool = threadPool; + } + @Override public List routes() { return unmodifiableList(asList(new Route(GET, "/{index}"), new Route(HEAD, "/{index}"))); @@ -69,7 +80,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC getIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getIndexRequest.masterNodeTimeout())); getIndexRequest.humanReadable(request.paramAsBoolean("human", false)); getIndexRequest.includeDefaults(request.paramAsBoolean("include_defaults", false)); - return channel -> client.admin().indices().getIndex(getIndexRequest, new RestToXContentListener<>(channel)); + final HttpChannel httpChannel = request.getHttpChannel(); + return channel -> new RestCancellableNodeClient(client, httpChannel).admin() + .indices() + .getIndex( + getIndexRequest, + new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request).map( + r -> StatusToXContentObject.withStatus(RestStatus.OK, r) + ) + ); } /** diff --git a/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesActionTests.java index ca49df3618c96..a2b61bd2288e6 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/admin/indices/RestGetIndicesActionTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.RestActionTestCase; @@ -33,7 +34,7 @@ public void testIncludeTypeNamesWarning() throws IOException { .withParams(params) .build(); - RestGetIndicesAction handler = new RestGetIndicesAction(); + RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool()); handler.prepareRequest(request, mock(NodeClient.class)); assertWarnings(RestGetIndicesAction.TYPES_DEPRECATION_MESSAGE); @@ -53,7 +54,7 @@ public void testIncludeTypeNamesWarningExists() throws IOException { .withParams(params) .build(); - RestGetIndicesAction handler = new RestGetIndicesAction(); + RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool()); handler.prepareRequest(request, mock(NodeClient.class)); } }