2020package org .elasticsearch .action .admin .cluster .node .tasks .cancel ;
2121
2222import org .elasticsearch .ResourceNotFoundException ;
23+ import org .elasticsearch .Version ;
2324import org .elasticsearch .action .ActionListener ;
2425import org .elasticsearch .action .FailedNodeException ;
2526import org .elasticsearch .action .StepListener ;
2627import org .elasticsearch .action .TaskOperationFailure ;
2728import org .elasticsearch .action .support .ActionFilters ;
29+ import org .elasticsearch .action .support .ChannelActionListener ;
2830import org .elasticsearch .action .support .GroupedActionListener ;
2931import org .elasticsearch .action .support .tasks .TransportTasksAction ;
3032import org .elasticsearch .cluster .node .DiscoveryNode ;
@@ -104,34 +106,43 @@ protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask
104106 @ Override
105107 protected void taskOperation (CancelTasksRequest request , CancellableTask cancellableTask , ActionListener <TaskInfo > listener ) {
106108 String nodeId = clusterService .localNode ().getId ();
107- if (cancellableTask .shouldCancelChildrenOnCancellation ()) {
109+ cancelTaskAndDescendants (cancellableTask , request .getReason (), request .waitForCompletion (),
110+ ActionListener .map (listener , r -> cancellableTask .taskInfo (nodeId , false )));
111+ }
112+
113+ void cancelTaskAndDescendants (CancellableTask task , String reason , boolean waitForCompletion , ActionListener <Void > listener ) {
114+ if (task .shouldCancelChildrenOnCancellation ()) {
108115 StepListener <Void > completedListener = new StepListener <>();
109116 GroupedActionListener <Void > groupedListener = new GroupedActionListener <>(ActionListener .map (completedListener , r -> null ), 3 );
110117 Collection <DiscoveryNode > childrenNodes =
111- taskManager .startBanOnChildrenNodes (cancellableTask .getId (), () -> groupedListener .onResponse (null ));
112- taskManager .cancel (cancellableTask , request . getReason () , () -> groupedListener .onResponse (null ));
118+ taskManager .startBanOnChildrenNodes (task .getId (), () -> groupedListener .onResponse (null ));
119+ taskManager .cancel (task , reason , () -> groupedListener .onResponse (null ));
113120
114121 StepListener <Void > banOnNodesListener = new StepListener <>();
115- setBanOnNodes (request . getReason (), cancellableTask , childrenNodes , banOnNodesListener );
122+ setBanOnNodes (reason , waitForCompletion , task , childrenNodes , banOnNodesListener );
116123 banOnNodesListener .whenComplete (groupedListener ::onResponse , groupedListener ::onFailure );
117124 // We remove bans after all child tasks are completed although in theory we can do it on a per-node basis.
118- completedListener .whenComplete (
119- r -> removeBanOnNodes (cancellableTask , childrenNodes ),
120- e -> removeBanOnNodes (cancellableTask , childrenNodes ));
121- // if wait_for_child_tasks is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
125+ completedListener .whenComplete (r -> removeBanOnNodes (task , childrenNodes ), e -> removeBanOnNodes (task , childrenNodes ));
126+ // if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
122127 // completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes.
123- if (request . waitForCompletion () ) {
124- completedListener .whenComplete (r -> listener .onResponse (cancellableTask . taskInfo ( nodeId , false ) ), listener ::onFailure );
128+ if (waitForCompletion ) {
129+ completedListener .whenComplete (r -> listener .onResponse (null ), listener ::onFailure );
125130 } else {
126- banOnNodesListener .whenComplete (r -> listener .onResponse (cancellableTask . taskInfo ( nodeId , false ) ), listener ::onFailure );
131+ banOnNodesListener .whenComplete (r -> listener .onResponse (null ), listener ::onFailure );
127132 }
128133 } else {
129- logger .trace ("task {} doesn't have any children that should be cancelled" , cancellableTask .getId ());
130- taskManager .cancel (cancellableTask , request .getReason (), () -> listener .onResponse (cancellableTask .taskInfo (nodeId , false )));
134+ logger .trace ("task {} doesn't have any children that should be cancelled" , task .getId ());
135+ if (waitForCompletion ) {
136+ taskManager .cancel (task , reason , () -> listener .onResponse (null ));
137+ } else {
138+ taskManager .cancel (task , reason , () -> {});
139+ listener .onResponse (null );
140+ }
131141 }
132142 }
133143
134- private void setBanOnNodes (String reason , CancellableTask task , Collection <DiscoveryNode > childNodes , ActionListener <Void > listener ) {
144+ private void setBanOnNodes (String reason , boolean waitForCompletion , CancellableTask task ,
145+ Collection <DiscoveryNode > childNodes , ActionListener <Void > listener ) {
135146 if (childNodes .isEmpty ()) {
136147 listener .onResponse (null );
137148 return ;
@@ -140,7 +151,7 @@ private void setBanOnNodes(String reason, CancellableTask task, Collection<Disco
140151 GroupedActionListener <Void > groupedListener =
141152 new GroupedActionListener <>(ActionListener .map (listener , r -> null ), childNodes .size ());
142153 final BanParentTaskRequest banRequest = BanParentTaskRequest .createSetBanParentTaskRequest (
143- new TaskId (clusterService .localNode ().getId (), task .getId ()), reason );
154+ new TaskId (clusterService .localNode ().getId (), task .getId ()), reason , waitForCompletion );
144155 for (DiscoveryNode node : childNodes ) {
145156 transportService .sendRequest (node , BAN_PARENT_ACTION_NAME , banRequest ,
146157 new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
@@ -171,33 +182,41 @@ private static class BanParentTaskRequest extends TransportRequest {
171182
172183 private final TaskId parentTaskId ;
173184 private final boolean ban ;
185+ private final boolean waitForCompletion ;
174186 private final String reason ;
175187
176- static BanParentTaskRequest createSetBanParentTaskRequest (TaskId parentTaskId , String reason ) {
177- return new BanParentTaskRequest (parentTaskId , reason );
188+ static BanParentTaskRequest createSetBanParentTaskRequest (TaskId parentTaskId , String reason , boolean waitForCompletion ) {
189+ return new BanParentTaskRequest (parentTaskId , reason , waitForCompletion );
178190 }
179191
180192 static BanParentTaskRequest createRemoveBanParentTaskRequest (TaskId parentTaskId ) {
181193 return new BanParentTaskRequest (parentTaskId );
182194 }
183195
184- private BanParentTaskRequest (TaskId parentTaskId , String reason ) {
196+ private BanParentTaskRequest (TaskId parentTaskId , String reason , boolean waitForCompletion ) {
185197 this .parentTaskId = parentTaskId ;
186198 this .ban = true ;
187199 this .reason = reason ;
200+ this .waitForCompletion = waitForCompletion ;
188201 }
189202
190203 private BanParentTaskRequest (TaskId parentTaskId ) {
191204 this .parentTaskId = parentTaskId ;
192205 this .ban = false ;
193206 this .reason = null ;
207+ this .waitForCompletion = false ;
194208 }
195209
196210 private BanParentTaskRequest (StreamInput in ) throws IOException {
197211 super (in );
198212 parentTaskId = TaskId .readFromStream (in );
199213 ban = in .readBoolean ();
200214 reason = ban ? in .readString () : null ;
215+ if (in .getVersion ().onOrAfter (Version .V_8_0_0 )) {
216+ waitForCompletion = in .readBoolean ();
217+ } else {
218+ waitForCompletion = false ;
219+ }
201220 }
202221
203222 @ Override
@@ -208,6 +227,9 @@ public void writeTo(StreamOutput out) throws IOException {
208227 if (ban ) {
209228 out .writeString (reason );
210229 }
230+ if (out .getVersion ().onOrAfter (Version .V_8_0_0 )) {
231+ out .writeBoolean (waitForCompletion );
232+ }
211233 }
212234 }
213235
@@ -217,13 +239,20 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC
217239 if (request .ban ) {
218240 logger .debug ("Received ban for the parent [{}] on the node [{}], reason: [{}]" , request .parentTaskId ,
219241 clusterService .localNode ().getId (), request .reason );
220- taskManager .setBan (request .parentTaskId , request .reason );
242+ final List <CancellableTask > childTasks = taskManager .setBan (request .parentTaskId , request .reason );
243+ final GroupedActionListener <Void > listener = new GroupedActionListener <>(ActionListener .map (
244+ new ChannelActionListener <>(channel , BAN_PARENT_ACTION_NAME , request ), r -> TransportResponse .Empty .INSTANCE ),
245+ childTasks .size () + 1 );
246+ for (CancellableTask childTask : childTasks ) {
247+ cancelTaskAndDescendants (childTask , request .reason , request .waitForCompletion , listener );
248+ }
249+ listener .onResponse (null );
221250 } else {
222251 logger .debug ("Removing ban for the parent [{}] on the node [{}]" , request .parentTaskId ,
223252 clusterService .localNode ().getId ());
224253 taskManager .removeBan (request .parentTaskId );
254+ channel .sendResponse (TransportResponse .Empty .INSTANCE );
225255 }
226- channel .sendResponse (TransportResponse .Empty .INSTANCE );
227256 }
228257 }
229258
0 commit comments