diff --git a/docs/changelog/100253.yaml b/docs/changelog/100253.yaml new file mode 100644 index 0000000000000..7a9d3f3fb13d7 --- /dev/null +++ b/docs/changelog/100253.yaml @@ -0,0 +1,5 @@ +pr: 100253 +summary: Propagate cancellation in `DataTiersUsageTransportAction` +area: Data streams +type: bug +issues: [] diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/rest/action/DataTiersUsageRestCancellationIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/rest/action/DataTiersUsageRestCancellationIT.java new file mode 100644 index 0000000000000..01b4a0d163fe5 --- /dev/null +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/rest/action/DataTiersUsageRestCancellationIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.rest.action; + +import org.apache.http.client.methods.HttpGet; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.XPackUsageRequest; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.netty4.Netty4Plugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.action.TransportXPackUsageAction; +import org.elasticsearch.xpack.core.action.XPackUsageAction; +import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; +import org.elasticsearch.xpack.core.action.XPackUsageResponse; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener; +import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled; +import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class DataTiersUsageRestCancellationIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(getTestTransportPlugin(), DataTiersUsageOnlyXPackPlugin.class, MockTransportService.TestPlugin.class); + } + + @Override + protected Settings nodeSettings(int ordinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(ordinal, otherSettings)) + .put(NetworkModule.HTTP_DEFAULT_TYPE_SETTING.getKey(), Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) + .build(); + } + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + public void testCancellation() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + + final CountDownLatch tasksBlockedLatch = new CountDownLatch(1); + final SubscribableListener nodeStatsRequestsReleaseListener = new SubscribableListener<>(); + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + ((MockTransportService) transportService).addRequestHandlingBehavior( + NodesStatsAction.NAME + "[n]", + (handler, request, channel, task) -> { + tasksBlockedLatch.countDown(); + nodeStatsRequestsReleaseListener.addListener( + ActionListener.wrap(ignored -> handler.messageReceived(request, channel, task), e -> { + throw new AssertionError("unexpected", e); + }) + ); + } + ); + } + + final Request request = new Request(HttpGet.METHOD_NAME, "/_xpack/usage"); + final PlainActionFuture future = new PlainActionFuture<>(); + final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future)); + + assertFalse(future.isDone()); + safeAwait(tasksBlockedLatch); // must wait for the node-level tasks to start to avoid cancelling being handled earlier + cancellable.cancel(); + + // NB this test works by blocking node-level stats requests; when #100230 is addressed this will need to target a different action. + assertAllCancellableTasksAreCancelled(NodesStatsAction.NAME); + assertAllCancellableTasksAreCancelled(XPackUsageAction.NAME); + + nodeStatsRequestsReleaseListener.onResponse(null); + expectThrows(CancellationException.class, future::actionGet); + + assertAllTasksHaveFinished(NodesStatsAction.NAME); + assertAllTasksHaveFinished(XPackUsageAction.NAME); + } + + public static class DataTiersUsageOnlyXPackPlugin extends LocalStateCompositeXPackPlugin { + public DataTiersUsageOnlyXPackPlugin(Settings settings, Path configPath) { + super(settings, configPath); + } + + @Override + protected Class> getUsageAction() { + return DataTiersOnlyTransportXPackUsageAction.class; + } + } + + public static class DataTiersOnlyTransportXPackUsageAction extends TransportXPackUsageAction { + @Inject + public DataTiersOnlyTransportXPackUsageAction( + ThreadPool threadPool, + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + NodeClient client + ) { + super(threadPool, transportService, clusterService, actionFilters, indexNameExpressionResolver, client); + } + + @Override + protected List usageActions() { + return List.of(XPackUsageFeatureAction.DATA_TIERS); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTiersUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTiersUsageTransportAction.java index 892135e856fc3..295df1ea51b6b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTiersUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/DataTiersUsageTransportAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -74,7 +75,7 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - client.admin() + new ParentTaskAssigningClient(client, clusterService.localNode(), task).admin() .cluster() .prepareNodesStats() .all()