2424import org .elasticsearch .action .TaskOperationFailure ;
2525import org .elasticsearch .action .admin .cluster .node .tasks .list .TaskInfo ;
2626import org .elasticsearch .action .support .ActionFilters ;
27- import org .elasticsearch .action .support .tasks .BaseTasksRequest ;
2827import org .elasticsearch .action .support .tasks .TransportTasksAction ;
2928import org .elasticsearch .cluster .ClusterName ;
3029import org .elasticsearch .cluster .ClusterService ;
3635import org .elasticsearch .common .io .stream .StreamOutput ;
3736import org .elasticsearch .common .settings .Settings ;
3837import org .elasticsearch .tasks .CancellableTask ;
38+ import org .elasticsearch .tasks .TaskId ;
3939import org .elasticsearch .threadpool .ThreadPool ;
4040import org .elasticsearch .transport .EmptyTransportResponseHandler ;
4141import org .elasticsearch .transport .TransportChannel ;
@@ -84,17 +84,17 @@ protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
8484 }
8585
8686 protected void processTasks (CancelTasksRequest request , Consumer <CancellableTask > operation ) {
87- if (request .taskId () != BaseTasksRequest . ALL_TASKS ) {
87+ if (request .taskId (). isSet () == false ) {
8888 // we are only checking one task, we can optimize it
89- CancellableTask task = taskManager .getCancellableTask (request .taskId ());
89+ CancellableTask task = taskManager .getCancellableTask (request .taskId (). getId () );
9090 if (task != null ) {
9191 if (request .match (task )) {
9292 operation .accept (task );
9393 } else {
9494 throw new IllegalArgumentException ("task [" + request .taskId () + "] doesn't support this operation" );
9595 }
9696 } else {
97- if (taskManager .getTask (request .taskId ()) != null ) {
97+ if (taskManager .getTask (request .taskId (). getId () ) != null ) {
9898 // The task exists, but doesn't support cancellation
9999 throw new IllegalArgumentException ("task [" + request .taskId () + "] doesn't support cancellation" );
100100 } else {
@@ -135,11 +135,14 @@ protected boolean accumulateExceptions() {
135135 }
136136
137137 private void setBanOnNodes (String reason , CancellableTask task , Set <String > nodes , BanLock banLock ) {
138- sendSetBanRequest (nodes , new BanParentTaskRequest (clusterService .localNode ().getId (), task .getId (), reason ), banLock );
138+ sendSetBanRequest (nodes ,
139+ BanParentTaskRequest .createSetBanParentTaskRequest (new TaskId (clusterService .localNode ().getId (), task .getId ()), reason ),
140+ banLock );
139141 }
140142
141143 private void removeBanOnNodes (CancellableTask task , Set <String > nodes ) {
142- sendRemoveBanRequest (nodes , new BanParentTaskRequest (clusterService .localNode ().getId (), task .getId ()));
144+ sendRemoveBanRequest (nodes ,
145+ BanParentTaskRequest .createRemoveBanParentTaskRequest (new TaskId (clusterService .localNode ().getId (), task .getId ())));
143146 }
144147
145148 private void sendSetBanRequest (Set <String > nodes , BanParentTaskRequest request , BanLock banLock ) {
@@ -148,8 +151,8 @@ private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request,
148151 DiscoveryNode discoveryNode = clusterState .getNodes ().get (node );
149152 if (discoveryNode != null ) {
150153 // Check if node still in the cluster
151- logger .debug ("Sending ban for tasks with the parent [{}:{} ] to the node [{}], ban [{}]" , request .parentNodeId , request
152- . parentTaskId , node , request .ban );
154+ logger .debug ("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]" , request .parentTaskId , node ,
155+ request .ban );
153156 transportService .sendRequest (discoveryNode , BAN_PARENT_ACTION_NAME , request ,
154157 new EmptyTransportResponseHandler (ThreadPool .Names .SAME ) {
155158 @ Override
@@ -164,8 +167,8 @@ public void handleException(TransportException exp) {
164167 });
165168 } else {
166169 banLock .onBanSet ();
167- logger .debug ("Cannot send ban for tasks with the parent [{}:{} ] to the node [{}] - the node no longer in the cluster" ,
168- request .parentNodeId , request . parentTaskId , node );
170+ logger .debug ("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster" ,
171+ request .parentTaskId , node );
169172 }
170173 }
171174 }
@@ -176,13 +179,12 @@ private void sendRemoveBanRequest(Set<String> nodes, BanParentTaskRequest reques
176179 DiscoveryNode discoveryNode = clusterState .getNodes ().get (node );
177180 if (discoveryNode != null ) {
178181 // Check if node still in the cluster
179- logger .debug ("Sending remove ban for tasks with the parent [{}:{}] to the node [{}]" , request .parentNodeId ,
180- request .parentTaskId , node );
182+ logger .debug ("Sending remove ban for tasks with the parent [{}] to the node [{}]" , request .parentTaskId , node );
181183 transportService .sendRequest (discoveryNode , BAN_PARENT_ACTION_NAME , request , EmptyTransportResponseHandler
182184 .INSTANCE_SAME );
183185 } else {
184- logger .debug ("Cannot send remove ban request for tasks with the parent [{}:{} ] to the node [{}] - the node no longer in " +
185- "the cluster" , request .parentNodeId , request . parentTaskId , node );
186+ logger .debug ("Cannot send remove ban request for tasks with the parent [{}] to the node [{}] - the node no longer in " +
187+ "the cluster" , request .parentTaskId , node );
186188 }
187189 }
188190 }
@@ -218,23 +220,27 @@ public void finish() {
218220
219221 private static class BanParentTaskRequest extends TransportRequest {
220222
221- private String parentNodeId ;
222-
223- private long parentTaskId ;
223+ private TaskId parentTaskId ;
224224
225225 private boolean ban ;
226226
227227 private String reason ;
228228
229- BanParentTaskRequest (String parentNodeId , long parentTaskId , String reason ) {
230- this .parentNodeId = parentNodeId ;
229+ static BanParentTaskRequest createSetBanParentTaskRequest (TaskId parentTaskId , String reason ) {
230+ return new BanParentTaskRequest (parentTaskId , reason );
231+ }
232+
233+ static BanParentTaskRequest createRemoveBanParentTaskRequest (TaskId parentTaskId ) {
234+ return new BanParentTaskRequest (parentTaskId );
235+ }
236+
237+ private BanParentTaskRequest (TaskId parentTaskId , String reason ) {
231238 this .parentTaskId = parentTaskId ;
232239 this .ban = true ;
233240 this .reason = reason ;
234241 }
235242
236- BanParentTaskRequest (String parentNodeId , long parentTaskId ) {
237- this .parentNodeId = parentNodeId ;
243+ private BanParentTaskRequest (TaskId parentTaskId ) {
238244 this .parentTaskId = parentTaskId ;
239245 this .ban = false ;
240246 }
@@ -245,8 +251,7 @@ public BanParentTaskRequest() {
245251 @ Override
246252 public void readFrom (StreamInput in ) throws IOException {
247253 super .readFrom (in );
248- parentNodeId = in .readString ();
249- parentTaskId = in .readLong ();
254+ parentTaskId = new TaskId (in );
250255 ban = in .readBoolean ();
251256 if (ban ) {
252257 reason = in .readString ();
@@ -256,8 +261,7 @@ public void readFrom(StreamInput in) throws IOException {
256261 @ Override
257262 public void writeTo (StreamOutput out ) throws IOException {
258263 super .writeTo (out );
259- out .writeString (parentNodeId );
260- out .writeLong (parentTaskId );
264+ parentTaskId .writeTo (out );
261265 out .writeBoolean (ban );
262266 if (ban ) {
263267 out .writeString (reason );
@@ -269,13 +273,13 @@ class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRe
269273 @ Override
270274 public void messageReceived (final BanParentTaskRequest request , final TransportChannel channel ) throws Exception {
271275 if (request .ban ) {
272- logger .debug ("Received ban for the parent [{}:{} ] on the node [{}], reason: [{}]" , request .parentNodeId , request
273- . parentTaskId , clusterService .localNode ().getId (), request .reason );
274- taskManager .setBan (request .parentNodeId , request . parentTaskId , request .reason );
276+ logger .debug ("Received ban for the parent [{}] on the node [{}], reason: [{}]" , request .parentTaskId ,
277+ clusterService .localNode ().getId (), request .reason );
278+ taskManager .setBan (request .parentTaskId , request .reason );
275279 } else {
276- logger .debug ("Removing ban for the parent [{}:{} ] on the node [{}]" , request . parentNodeId , request .parentTaskId ,
280+ logger .debug ("Removing ban for the parent [{}] on the node [{}]" , request .parentTaskId ,
277281 clusterService .localNode ().getId ());
278- taskManager .removeBan (request .parentNodeId , request . parentTaskId );
282+ taskManager .removeBan (request .parentTaskId );
279283 }
280284 channel .sendResponse (TransportResponse .Empty .INSTANCE );
281285 }
0 commit comments