2828import org .elasticsearch .action .StepListener ;
2929import org .elasticsearch .action .support .ChannelActionListener ;
3030import org .elasticsearch .action .support .GroupedActionListener ;
31- import org .elasticsearch .cluster .node .DiscoveryNode ;
3231import org .elasticsearch .common .io .stream .StreamInput ;
3332import org .elasticsearch .common .io .stream .StreamOutput ;
3433import org .elasticsearch .threadpool .ThreadPool ;
3534import org .elasticsearch .transport .EmptyTransportResponseHandler ;
35+ import org .elasticsearch .transport .Transport ;
3636import org .elasticsearch .transport .TransportChannel ;
3737import org .elasticsearch .transport .TransportException ;
3838import org .elasticsearch .transport .TransportRequest ;
39+ import org .elasticsearch .transport .TransportRequestDeduplicator ;
3940import org .elasticsearch .transport .TransportRequestHandler ;
41+ import org .elasticsearch .transport .TransportRequestOptions ;
4042import org .elasticsearch .transport .TransportResponse ;
4143import org .elasticsearch .transport .TransportService ;
4244
4345import java .io .IOException ;
4446import java .util .Collection ;
4547import java .util .List ;
48+ import java .util .Objects ;
4649
4750public class TaskCancellationService {
4851 public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban" ;
4952 private static final Logger logger = LogManager .getLogger (TaskCancellationService .class );
5053 private final TransportService transportService ;
5154 private final TaskManager taskManager ;
55+ private final TransportRequestDeduplicator <CancelRequest > deduplicator = new TransportRequestDeduplicator <>();
5256
5357 public TaskCancellationService (TransportService transportService ) {
5458 this .transportService = transportService ;
@@ -61,35 +65,63 @@ private String localNodeId() {
6165 return transportService .getLocalNode ().getId ();
6266 }
6367
64- void cancelTaskAndDescendants (CancellableTask task , String reason , boolean waitForCompletion , ActionListener <Void > listener ) {
68+ private static class CancelRequest {
69+ final CancellableTask task ;
70+ final boolean waitForCompletion ;
71+
72+ CancelRequest (CancellableTask task , boolean waitForCompletion ) {
73+ this .task = task ;
74+ this .waitForCompletion = waitForCompletion ;
75+ }
76+
77+ @ Override
78+ public boolean equals (Object o ) {
79+ if (this == o ) return true ;
80+ if (o == null || getClass () != o .getClass ()) return false ;
81+ final CancelRequest that = (CancelRequest ) o ;
82+ return waitForCompletion == that .waitForCompletion && Objects .equals (task , that .task );
83+ }
84+
85+ @ Override
86+ public int hashCode () {
87+ return Objects .hash (task , waitForCompletion );
88+ }
89+ }
90+
91+ void cancelTaskAndDescendants (CancellableTask task , String reason , boolean waitForCompletion , ActionListener <Void > finalListener ) {
92+ deduplicator .executeOnce (new CancelRequest (task , waitForCompletion ), finalListener ,
93+ (r , listener ) -> doCancelTaskAndDescendants (task , reason , waitForCompletion , listener ));
94+ }
95+
96+ void doCancelTaskAndDescendants (CancellableTask task , String reason , boolean waitForCompletion , ActionListener <Void > listener ) {
6597 final TaskId taskId = task .taskInfo (localNodeId (), false ).getTaskId ();
6698 if (task .shouldCancelChildrenOnCancellation ()) {
6799 logger .trace ("cancelling task [{}] and its descendants" , taskId );
68100 StepListener <Void > completedListener = new StepListener <>();
69101 GroupedActionListener <Void > groupedListener = new GroupedActionListener <>(completedListener .map (r -> null ), 3 );
70- Collection <DiscoveryNode > childrenNodes = taskManager .startBanOnChildrenNodes (task .getId (), () -> {
102+ Collection <Transport . Connection > childConnections = taskManager .startBanOnChildTasks (task .getId (), () -> {
71103 logger .trace ("child tasks of parent [{}] are completed" , taskId );
72104 groupedListener .onResponse (null );
73105 });
74106 taskManager .cancel (task , reason , () -> {
75107 logger .trace ("task [{}] is cancelled" , taskId );
76108 groupedListener .onResponse (null );
77109 });
78- StepListener <Void > banOnNodesListener = new StepListener <>();
79- setBanOnNodes (reason , waitForCompletion , task , childrenNodes , banOnNodesListener );
80- banOnNodesListener .whenComplete (groupedListener ::onResponse , groupedListener ::onFailure );
110+ StepListener <Void > setBanListener = new StepListener <>();
111+ setBanOnChildConnections (reason , waitForCompletion , task , childConnections , setBanListener );
112+ setBanListener .whenComplete (groupedListener ::onResponse , groupedListener ::onFailure );
81113 // If we start unbanning when the last child task completed and that child task executed with a specific user, then unban
82114 // requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context.
83115 final Runnable removeBansRunnable = transportService .getThreadPool ().getThreadContext ()
84- .preserveContext (() -> removeBanOnNodes (task , childrenNodes ));
85- // We remove bans after all child tasks are completed although in theory we can do it on a per-node basis.
116+ .preserveContext (() -> removeBanOnChildConnections (task , childConnections ));
117+ // We remove bans after all child tasks are completed although in theory we can do it on a per-connection basis.
86118 completedListener .whenComplete (r -> removeBansRunnable .run (), e -> removeBansRunnable .run ());
87- // if wait_for_completion is true, then only return when (1) bans are placed on child nodes , (2) child tasks are
88- // completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes .
119+ // if wait_for_completion is true, then only return when (1) bans are placed on child connections , (2) child tasks are
120+ // completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child connections .
89121 if (waitForCompletion ) {
90122 completedListener .whenComplete (r -> listener .onResponse (null ), listener ::onFailure );
91123 } else {
92- banOnNodesListener .whenComplete (r -> listener .onResponse (null ), listener ::onFailure );
124+ setBanListener .whenComplete (r -> listener .onResponse (null ), listener ::onFailure );
93125 }
94126 } else {
95127 logger .trace ("task [{}] doesn't have any children that should be cancelled" , taskId );
@@ -102,47 +134,48 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
102134 }
103135 }
104136
105- private void setBanOnNodes (String reason , boolean waitForCompletion , CancellableTask task ,
106- Collection <DiscoveryNode > childNodes , ActionListener <Void > listener ) {
107- if (childNodes .isEmpty ()) {
137+ private void setBanOnChildConnections (String reason , boolean waitForCompletion , CancellableTask task ,
138+ Collection <Transport . Connection > childConnections , ActionListener <Void > listener ) {
139+ if (childConnections .isEmpty ()) {
108140 listener .onResponse (null );
109141 return ;
110142 }
111143 final TaskId taskId = new TaskId (localNodeId (), task .getId ());
112- logger .trace ("cancelling child tasks of [{}] on child nodes {}" , taskId , childNodes );
113- GroupedActionListener <Void > groupedListener = new GroupedActionListener <>(listener .map (r -> null ), childNodes .size ());
144+ logger .trace ("cancelling child tasks of [{}] on child connections {}" , taskId , childConnections );
145+ GroupedActionListener <Void > groupedListener = new GroupedActionListener <>(listener .map (r -> null ), childConnections .size ());
114146 final BanParentTaskRequest banRequest = BanParentTaskRequest .createSetBanParentTaskRequest (taskId , reason , waitForCompletion );
115- for (DiscoveryNode node : childNodes ) {
116- transportService .sendRequest (node , BAN_PARENT_ACTION_NAME , banRequest ,
147+ for (Transport . Connection connection : childConnections ) {
148+ transportService .sendRequest (connection , BAN_PARENT_ACTION_NAME , banRequest , TransportRequestOptions . EMPTY ,
117149 new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
118150 @ Override
119151 public void handleResponse (TransportResponse .Empty response ) {
120- logger .trace ("sent ban for tasks with the parent [{}] to the node [{}]" , taskId , node );
152+ logger .trace ("sent ban for tasks with the parent [{}] for connection [{}]" , taskId , connection );
121153 groupedListener .onResponse (null );
122154 }
123155
124156 @ Override
125157 public void handleException (TransportException exp ) {
126158 assert ExceptionsHelper .unwrapCause (exp ) instanceof ElasticsearchSecurityException == false ;
127- logger .warn ("Cannot send ban for tasks with the parent [{}] to the node [{}]" , taskId , node );
159+ logger .warn ("Cannot send ban for tasks with the parent [{}] for connection [{}]" , taskId , connection );
128160 groupedListener .onFailure (exp );
129161 }
130162 });
131163 }
132164 }
133165
134- private void removeBanOnNodes (CancellableTask task , Collection <DiscoveryNode > childNodes ) {
166+ private void removeBanOnChildConnections (CancellableTask task , Collection <Transport . Connection > childConnections ) {
135167 final BanParentTaskRequest request =
136168 BanParentTaskRequest .createRemoveBanParentTaskRequest (new TaskId (localNodeId (), task .getId ()));
137- for (DiscoveryNode node : childNodes ) {
138- logger .trace ("Sending remove ban for tasks with the parent [{}] to the node [{}]" , request .parentTaskId , node );
139- transportService .sendRequest (node , BAN_PARENT_ACTION_NAME , request , new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
140- @ Override
141- public void handleException (TransportException exp ) {
142- assert ExceptionsHelper .unwrapCause (exp ) instanceof ElasticsearchSecurityException == false ;
143- logger .info ("failed to remove the parent ban for task {} on node {}" , request .parentTaskId , node );
144- }
145- });
169+ for (Transport .Connection connection : childConnections ) {
170+ logger .trace ("Sending remove ban for tasks with the parent [{}] for connection [{}]" , request .parentTaskId , connection );
171+ transportService .sendRequest (connection , BAN_PARENT_ACTION_NAME , request , TransportRequestOptions .EMPTY ,
172+ new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
173+ @ Override
174+ public void handleException (TransportException exp ) {
175+ assert ExceptionsHelper .unwrapCause (exp ) instanceof ElasticsearchSecurityException == false ;
176+ logger .info ("failed to remove the parent ban for task {} for connection {}" , request .parentTaskId , connection );
177+ }
178+ });
146179 }
147180 }
148181
0 commit comments