Skip to content

Commit 1d5ccbe

Browse files
authored
Add support for task cancellation to TransportMasterNodeAction (#72215)
Backport of #72157
1 parent a1cc706 commit 1d5ccbe

File tree

2 files changed

+149
-4
lines changed

2 files changed

+149
-4
lines changed

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
import org.elasticsearch.common.unit.TimeValue;
3232
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3333
import org.elasticsearch.node.NodeClosedException;
34+
import org.elasticsearch.tasks.CancellableTask;
3435
import org.elasticsearch.tasks.Task;
3536
import org.elasticsearch.threadpool.ThreadPool;
3637
import org.elasticsearch.transport.ConnectTransportException;
3738
import org.elasticsearch.transport.RemoteTransportException;
3839
import org.elasticsearch.transport.TransportException;
3940
import org.elasticsearch.transport.TransportService;
4041

42+
import java.util.concurrent.CancellationException;
4143
import java.util.function.Predicate;
4244

4345
/**
@@ -88,6 +90,15 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
8890
masterOperation(request, state, listener);
8991
}
9092

93+
private void executeMasterOperation(Task task, Request request, ClusterState state,
94+
ActionListener<Response> listener) throws Exception {
95+
if (task instanceof CancellableTask && ((CancellableTask) task).isCancelled()) {
96+
throw new CancellationException("Task was cancelled");
97+
}
98+
99+
masterOperation(task, request, state, listener);
100+
}
101+
91102
protected boolean localExecute(Request request) {
92103
return false;
93104
}
@@ -120,6 +131,10 @@ class AsyncSingleAction {
120131
}
121132

122133
protected void doStart(ClusterState clusterState) {
134+
if (isTaskCancelled()) {
135+
listener.onFailure(new CancellationException("Task was cancelled"));
136+
return;
137+
}
123138
try {
124139
final DiscoveryNodes nodes = clusterState.nodes();
125140
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
@@ -152,7 +167,7 @@ protected void doStart(ClusterState clusterState) {
152167
}
153168
});
154169
threadPool.executor(executor)
155-
.execute(ActionRunnable.wrap(delegate, l -> masterOperation(task, request, clusterState, l)));
170+
.execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));
156171
}
157172
} else {
158173
if (nodes.getMasterNode() == null) {
@@ -218,8 +233,11 @@ public void onTimeout(TimeValue timeout) {
218233
actionName, timeout), failure);
219234
listener.onFailure(new MasterNotDiscoveredException(failure));
220235
}
221-
}, statePredicate
222-
);
236+
}, clusterState -> isTaskCancelled() || statePredicate.test(clusterState));
237+
}
238+
239+
private boolean isTaskCancelled() {
240+
return task instanceof CancellableTask && ((CancellableTask) task).isCancelled();
223241
}
224242
}
225243

server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.ActionRequestValidationException;
1515
import org.elasticsearch.action.ActionResponse;
1616
import org.elasticsearch.action.support.ActionFilters;
17+
import org.elasticsearch.action.support.ActionTestUtils;
1718
import org.elasticsearch.action.support.PlainActionFuture;
1819
import org.elasticsearch.action.support.ThreadedActionListener;
1920
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
@@ -30,12 +31,17 @@
3031
import org.elasticsearch.cluster.service.ClusterService;
3132
import org.elasticsearch.common.io.stream.StreamInput;
3233
import org.elasticsearch.common.io.stream.StreamOutput;
34+
import org.elasticsearch.common.settings.Settings;
3335
import org.elasticsearch.common.unit.TimeValue;
36+
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
3437
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3538
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
3639
import org.elasticsearch.node.NodeClosedException;
3740
import org.elasticsearch.rest.RestStatus;
41+
import org.elasticsearch.tasks.CancellableTask;
3842
import org.elasticsearch.tasks.Task;
43+
import org.elasticsearch.tasks.TaskId;
44+
import org.elasticsearch.tasks.TaskManager;
3945
import org.elasticsearch.test.ESTestCase;
4046
import org.elasticsearch.test.transport.CapturingTransport;
4147
import org.elasticsearch.threadpool.TestThreadPool;
@@ -50,8 +56,12 @@
5056
import java.io.IOException;
5157
import java.util.Collections;
5258
import java.util.HashSet;
59+
import java.util.Map;
5360
import java.util.Objects;
5461
import java.util.Set;
62+
import java.util.concurrent.CancellationException;
63+
import java.util.concurrent.CountDownLatch;
64+
import java.util.concurrent.CyclicBarrier;
5565
import java.util.concurrent.ExecutionException;
5666
import java.util.concurrent.TimeUnit;
5767

@@ -125,6 +135,11 @@ public static class Request extends MasterNodeRequest<Request> {
125135
public ActionRequestValidationException validate() {
126136
return null;
127137
}
138+
139+
@Override
140+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
141+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
142+
}
128143
}
129144

130145
class Response extends ActionResponse {
@@ -159,12 +174,18 @@ public void writeTo(StreamOutput out) throws IOException {
159174
class Action extends TransportMasterNodeAction<Request, Response> {
160175
Action(String actionName, TransportService transportService, ClusterService clusterService,
161176
ThreadPool threadPool) {
177+
this(actionName, transportService, clusterService, threadPool, ThreadPool.Names.SAME);
178+
}
179+
180+
Action(String actionName, TransportService transportService, ClusterService clusterService,
181+
ThreadPool threadPool, String executor) {
162182
super(actionName, transportService, clusterService, threadPool,
163183
new ActionFilters(new HashSet<>()), Request::new,
164184
TestIndexNameExpressionResolver.newInstance(), Response::new,
165-
ThreadPool.Names.SAME);
185+
executor);
166186
}
167187

188+
168189
@Override
169190
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
170191
// remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER
@@ -458,4 +479,110 @@ protected void masterOperation(Request request, ClusterState state, ActionListen
458479
assertTrue(listener.isDone());
459480
assertThat(listener.get(), equalTo(response));
460481
}
482+
483+
public void testTaskCancellation() {
484+
ClusterBlock block = new ClusterBlock(1,
485+
"",
486+
true,
487+
true,
488+
false,
489+
randomFrom(RestStatus.values()),
490+
ClusterBlockLevel.ALL
491+
);
492+
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
493+
.blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
494+
495+
// Update the cluster state with a block so the request waits until it's unblocked
496+
setState(clusterService, stateWithBlock);
497+
498+
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
499+
500+
Request request = new Request();
501+
final CancellableTask task = (CancellableTask) taskManager.register("type", "internal:testAction", request);
502+
503+
boolean cancelBeforeStart = randomBoolean();
504+
if (cancelBeforeStart) {
505+
taskManager.cancel(task, "", () -> {});
506+
assertThat(task.isCancelled(), equalTo(true));
507+
}
508+
509+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
510+
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool) {
511+
@Override
512+
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
513+
Set<ClusterBlock> blocks = state.blocks().global();
514+
return blocks.isEmpty() ? null : new ClusterBlockException(blocks);
515+
}
516+
}, task, request, listener);
517+
518+
final int genericThreads = threadPool.info(ThreadPool.Names.GENERIC).getMax();
519+
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.GENERIC);
520+
final CyclicBarrier barrier = new CyclicBarrier(genericThreads + 1);
521+
final CountDownLatch latch = new CountDownLatch(1);
522+
523+
if (cancelBeforeStart == false) {
524+
assertThat(listener.isDone(), equalTo(false));
525+
526+
taskManager.cancel(task, "", () -> {});
527+
assertThat(task.isCancelled(), equalTo(true));
528+
529+
// Now that the task is cancelled, let the request to be executed
530+
final ClusterState.Builder newStateBuilder = ClusterState.builder(stateWithBlock);
531+
532+
// Either unblock the cluster state or just do an unrelated cluster state change that will check
533+
// if the task has been cancelled
534+
if (randomBoolean()) {
535+
newStateBuilder.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK);
536+
} else {
537+
newStateBuilder.incrementVersion();
538+
}
539+
setState(clusterService, newStateBuilder.build());
540+
}
541+
expectThrows(CancellationException.class, listener::actionGet);
542+
}
543+
544+
public void testTaskCancellationOnceActionItIsDispatchedToMaster() throws Exception {
545+
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
546+
547+
Request request = new Request();
548+
final CancellableTask task = (CancellableTask) taskManager.register("type", "internal:testAction", request);
549+
550+
// Block all the threads of the executor in which the master operation will be dispatched to
551+
// ensure that the master operation won't be executed until the threads are released
552+
final String executorName = ThreadPool.Names.GENERIC;
553+
final Runnable releaseBlockedThreads = blockAllThreads(executorName);
554+
555+
PlainActionFuture<Response> listener = new PlainActionFuture<>();
556+
ActionTestUtils.execute(new Action("internal:testAction", transportService, clusterService, threadPool, executorName),
557+
task,
558+
request,
559+
listener
560+
);
561+
562+
taskManager.cancel(task, "", () -> {});
563+
assertThat(task.isCancelled(), equalTo(true));
564+
565+
releaseBlockedThreads.run();
566+
567+
expectThrows(CancellationException.class, listener::actionGet);
568+
}
569+
570+
private Runnable blockAllThreads(String executorName) throws Exception {
571+
final int numberOfThreads = threadPool.info(executorName).getMax();
572+
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName);
573+
final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads + 1);
574+
final CountDownLatch latch = new CountDownLatch(1);
575+
for (int i = 0; i < numberOfThreads; i++) {
576+
executor.submit(() -> {
577+
try {
578+
barrier.await();
579+
latch.await();
580+
} catch (Exception e) {
581+
throw new AssertionError(e);
582+
}
583+
});
584+
}
585+
barrier.await();
586+
return latch::countDown;
587+
}
461588
}

0 commit comments

Comments
 (0)