1919
2020package org .elasticsearch .persistent ;
2121
22- import org .apache .logging .log4j .Logger ;
2322import org .elasticsearch .ResourceAlreadyExistsException ;
2423import org .elasticsearch .ResourceNotFoundException ;
2524import org .elasticsearch .action .ActionListener ;
3332import org .elasticsearch .common .Nullable ;
3433import org .elasticsearch .common .component .AbstractComponent ;
3534import org .elasticsearch .common .settings .Settings ;
36- import org .elasticsearch .tasks .Task ;
3735import org .elasticsearch .persistent .PersistentTasksCustomMetaData .Assignment ;
3836import org .elasticsearch .persistent .PersistentTasksCustomMetaData .PersistentTask ;
37+ import org .elasticsearch .tasks .Task ;
3938
4039import java .util .Objects ;
4140
@@ -52,29 +51,31 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
5251 this .clusterService = clusterService ;
5352 clusterService .addListener (this );
5453 this .registry = registry ;
55-
5654 }
5755
5856 /**
5957 * Creates a new persistent task on master node
6058 *
61- * @param action the action name
62- * @param params params
63- * @param listener the listener that will be called when task is started
59+ * @param taskId the task's id
60+ * @param taskName the task's name
61+ * @param taskParams the task's parameters
62+ * @param listener the listener that will be called when task is started
6463 */
65- public <Params extends PersistentTaskParams > void createPersistentTask (String taskId , String action , @ Nullable Params params ,
64+ public <Params extends PersistentTaskParams > void createPersistentTask (String taskId , String taskName , @ Nullable Params taskParams ,
6665 ActionListener <PersistentTask <?>> listener ) {
6766 clusterService .submitStateUpdateTask ("create persistent task" , new ClusterStateUpdateTask () {
6867 @ Override
69- public ClusterState execute (ClusterState currentState ) throws Exception {
68+ public ClusterState execute (ClusterState currentState ) {
7069 PersistentTasksCustomMetaData .Builder builder = builder (currentState );
7170 if (builder .hasTask (taskId )) {
7271 throw new ResourceAlreadyExistsException ("task with id {" + taskId + "} already exist" );
7372 }
74- validate (action , currentState , params );
75- final Assignment assignment ;
76- assignment = getAssignement (action , currentState , params );
77- return update (currentState , builder .addTask (taskId , action , params , assignment ));
73+
74+ PersistentTasksExecutor <Params > taskExecutor = registry .getPersistentTaskExecutorSafe (taskName );
75+ taskExecutor .validate (taskParams , currentState );
76+
77+ Assignment assignment = createAssignment (taskName , taskParams , currentState );
78+ return update (currentState , builder .addTask (taskId , taskName , taskParams , assignment ));
7879 }
7980
8081 @ Override
@@ -95,7 +96,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
9596 });
9697 }
9798
98-
9999 /**
100100 * Restarts a record about a running persistent task from cluster state
101101 *
@@ -114,7 +114,7 @@ public void completePersistentTask(String id, long allocationId, Exception failu
114114 }
115115 clusterService .submitStateUpdateTask (source , new ClusterStateUpdateTask () {
116116 @ Override
117- public ClusterState execute (ClusterState currentState ) throws Exception {
117+ public ClusterState execute (ClusterState currentState ) {
118118 PersistentTasksCustomMetaData .Builder tasksInProgress = builder (currentState );
119119 if (tasksInProgress .hasTask (id , allocationId )) {
120120 tasksInProgress .removeTask (id );
@@ -185,7 +185,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
185185 public void updatePersistentTaskStatus (String id , long allocationId , Task .Status status , ActionListener <PersistentTask <?>> listener ) {
186186 clusterService .submitStateUpdateTask ("update task status" , new ClusterStateUpdateTask () {
187187 @ Override
188- public ClusterState execute (ClusterState currentState ) throws Exception {
188+ public ClusterState execute (ClusterState currentState ) {
189189 PersistentTasksCustomMetaData .Builder tasksInProgress = builder (currentState );
190190 if (tasksInProgress .hasTask (id , allocationId )) {
191191 return update (currentState , tasksInProgress .updateTaskStatus (id , status ));
@@ -211,93 +211,85 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
211211 });
212212 }
213213
214- private <Params extends PersistentTaskParams > Assignment getAssignement (String taskName , ClusterState currentState ,
215- @ Nullable Params params ) {
216- PersistentTasksExecutor <Params > persistentTasksExecutor = registry .getPersistentTaskExecutorSafe (taskName );
217- return persistentTasksExecutor .getAssignment (params , currentState );
218- }
214+ /**
215+ * Creates a new {@link Assignment} for the given persistent task.
216+ *
217+ * @param taskName the task's name
218+ * @param taskParams the task's parameters
219+ * @param currentState the current {@link ClusterState}
219220
220- private <Params extends PersistentTaskParams > void validate (String taskName , ClusterState currentState , @ Nullable Params params ) {
221+ * @return a new {@link Assignment}
222+ */
223+ private <Params extends PersistentTaskParams > Assignment createAssignment (final String taskName ,
224+ final @ Nullable Params taskParams ,
225+ final ClusterState currentState ) {
221226 PersistentTasksExecutor <Params > persistentTasksExecutor = registry .getPersistentTaskExecutorSafe (taskName );
222- persistentTasksExecutor .validate ( params , currentState );
227+ return persistentTasksExecutor .getAssignment ( taskParams , currentState );
223228 }
224229
225230 @ Override
226231 public void clusterChanged (ClusterChangedEvent event ) {
227232 if (event .localNodeMaster ()) {
228- logger .trace ("checking task reassignment for cluster state {}" , event .state ().getVersion ());
229- if (reassignmentRequired (event , this ::getAssignement )) {
230- logger .trace ("task reassignment is needed" );
231- reassignTasks ();
232- } else {
233- logger .trace ("task reassignment is not needed" );
233+ if (shouldReassignPersistentTasks (event )) {
234+ logger .trace ("checking task reassignment for cluster state {}" , event .state ().getVersion ());
235+ clusterService .submitStateUpdateTask ("reassign persistent tasks" , new ClusterStateUpdateTask () {
236+ @ Override
237+ public ClusterState execute (ClusterState currentState ) {
238+ return reassignTasks (currentState );
239+ }
240+
241+ @ Override
242+ public void onFailure (String source , Exception e ) {
243+ logger .warn ("failed to reassign persistent tasks" , e );
244+ }
245+ });
234246 }
235247 }
236248 }
237249
238- interface ExecutorNodeDecider {
239- <Params extends PersistentTaskParams > Assignment getAssignment (String action , ClusterState currentState , Params params );
240- }
250+ /**
251+ * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
252+ * situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed.
253+ */
254+ boolean shouldReassignPersistentTasks (final ClusterChangedEvent event ) {
255+ final PersistentTasksCustomMetaData tasks = event .state ().getMetaData ().custom (PersistentTasksCustomMetaData .TYPE );
256+ if (tasks == null ) {
257+ return false ;
258+ }
241259
242- static boolean reassignmentRequired (ClusterChangedEvent event , ExecutorNodeDecider decider ) {
243- PersistentTasksCustomMetaData tasks = event .state ().getMetaData ().custom (PersistentTasksCustomMetaData .TYPE );
244- PersistentTasksCustomMetaData prevTasks = event .previousState ().getMetaData ().custom (PersistentTasksCustomMetaData .TYPE );
245- if (tasks != null && (Objects .equals (tasks , prevTasks ) == false ||
246- event .nodesChanged () ||
247- event .routingTableChanged () ||
248- event .previousState ().nodes ().isLocalNodeElectedMaster () == false )) {
249- // We need to check if removed nodes were running any of the tasks and reassign them
250- boolean reassignmentRequired = false ;
251- for (PersistentTask <?> taskInProgress : tasks .tasks ()) {
252- if (taskInProgress .needsReassignment (event .state ().nodes ())) {
253- // there is an unassigned task or task with a disappeared node - we need to try assigning it
254- if (Objects .equals (taskInProgress .getAssignment (),
255- decider .getAssignment (taskInProgress .getTaskName (), event .state (), taskInProgress .getParams ())) == false ) {
256- // it looks like a assignment for at least one task is possible - let's trigger reassignment
257- reassignmentRequired = true ;
258- break ;
259- }
260+ boolean masterChanged = event .previousState ().nodes ().isLocalNodeElectedMaster () == false ;
260261
262+ if (persistentTasksChanged (event ) || event .nodesChanged () || event .routingTableChanged () || masterChanged ) {
263+ for (PersistentTask <?> task : tasks .tasks ()) {
264+ if (needsReassignment (task .getAssignment (), event .state ().nodes ())) {
265+ Assignment assignment = createAssignment (task .getTaskName (), task .getParams (), event .state ());
266+ if (Objects .equals (assignment , task .getAssignment ()) == false ) {
267+ return true ;
268+ }
261269 }
262270 }
263- return reassignmentRequired ;
264271 }
265272 return false ;
266273 }
267274
268275 /**
269- * Evaluates the cluster state and tries to assign tasks to nodes
276+ * Evaluates the cluster state and tries to assign tasks to nodes.
277+ *
278+ * @param currentState the cluster state to analyze
279+ * @return an updated version of the cluster state
270280 */
271- public void reassignTasks () {
272- clusterService .submitStateUpdateTask ("reassign persistent tasks" , new ClusterStateUpdateTask () {
273- @ Override
274- public ClusterState execute (ClusterState currentState ) throws Exception {
275- return reassignTasks (currentState , logger , PersistentTasksClusterService .this ::getAssignement );
276- }
277-
278- @ Override
279- public void onFailure (String source , Exception e ) {
280- logger .warn ("Unsuccessful persistent task reassignment" , e );
281- }
282-
283- @ Override
284- public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
285-
286- }
287- });
288- }
289-
290- static ClusterState reassignTasks (ClusterState currentState , Logger logger , ExecutorNodeDecider decider ) {
291- PersistentTasksCustomMetaData tasks = currentState .getMetaData ().custom (PersistentTasksCustomMetaData .TYPE );
281+ ClusterState reassignTasks (final ClusterState currentState ) {
292282 ClusterState clusterState = currentState ;
293- DiscoveryNodes nodes = currentState .nodes ();
283+
284+ final PersistentTasksCustomMetaData tasks = currentState .getMetaData ().custom (PersistentTasksCustomMetaData .TYPE );
294285 if (tasks != null ) {
295286 logger .trace ("reassigning {} persistent tasks" , tasks .tasks ().size ());
287+ final DiscoveryNodes nodes = currentState .nodes ();
288+
296289 // We need to check if removed nodes were running any of the tasks and reassign them
297290 for (PersistentTask <?> task : tasks .tasks ()) {
298- if (task .needsReassignment (nodes )) {
299- // there is an unassigned task - we need to try assigning it
300- Assignment assignment = decider .getAssignment (task .getTaskName (), clusterState , task .getParams ());
291+ if (needsReassignment (task .getAssignment (), nodes )) {
292+ Assignment assignment = createAssignment (task .getTaskName (), task .getParams (), clusterState );
301293 if (Objects .equals (assignment , task .getAssignment ()) == false ) {
302294 logger .trace ("reassigning task {} from node {} to node {}" , task .getId (),
303295 task .getAssignment ().getExecutorNode (), assignment .getExecutorNode ());
@@ -313,6 +305,17 @@ static ClusterState reassignTasks(ClusterState currentState, Logger logger, Exec
313305 return clusterState ;
314306 }
315307
308+ /** Returns true if the persistent tasks are not equal between the previous and the current cluster state **/
309+ static boolean persistentTasksChanged (final ClusterChangedEvent event ) {
310+ String type = PersistentTasksCustomMetaData .TYPE ;
311+ return Objects .equals (event .state ().metaData ().custom (type ), event .previousState ().metaData ().custom (type )) == false ;
312+ }
313+
314+ /** Returns true if the task is not assigned or is assigned to a non-existing node */
315+ static boolean needsReassignment (final Assignment assignment , final DiscoveryNodes nodes ) {
316+ return (assignment .isAssigned () == false || nodes .nodeExists (assignment .getExecutorNode ()) == false );
317+ }
318+
316319 private static PersistentTasksCustomMetaData .Builder builder (ClusterState currentState ) {
317320 return PersistentTasksCustomMetaData .builder (currentState .getMetaData ().custom (PersistentTasksCustomMetaData .TYPE ));
318321 }
0 commit comments