2121import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
2222import org .elasticsearch .common .util .concurrent .AtomicArray ;
2323import org .elasticsearch .discovery .MasterNotDiscoveredException ;
24+ import org .elasticsearch .persistent .PersistentTasksClusterService ;
2425import org .elasticsearch .persistent .PersistentTasksCustomMetaData ;
2526import org .elasticsearch .persistent .PersistentTasksService ;
2627import org .elasticsearch .tasks .Task ;
3435import org .elasticsearch .xpack .ml .datafeed .persistence .DatafeedConfigProvider ;
3536
3637import java .util .ArrayList ;
38+ import java .util .Collection ;
3739import java .util .HashSet ;
3840import java .util .List ;
3941import java .util .Set ;
@@ -68,32 +70,46 @@ public TransportStopDatafeedAction(TransportService transportService, ThreadPool
6870 * @param tasks Persistent task meta data
6971 * @param startedDatafeedIds Started datafeed ids are added to this list
7072 * @param stoppingDatafeedIds Stopping datafeed ids are added to this list
73+ * @param notStoppedDatafeedIds Datafeed ids are added to this list for all datafeeds that are not stopped
7174 */
72- static void sortDatafeedIdsByTaskState (Set <String > expandedDatafeedIds ,
75+ static void sortDatafeedIdsByTaskState (Collection <String > expandedDatafeedIds ,
7376 PersistentTasksCustomMetaData tasks ,
7477 List <String > startedDatafeedIds ,
75- List <String > stoppingDatafeedIds ) {
78+ List <String > stoppingDatafeedIds ,
79+ List <String > notStoppedDatafeedIds ) {
7680
7781 for (String expandedDatafeedId : expandedDatafeedIds ) {
7882 addDatafeedTaskIdAccordingToState (expandedDatafeedId , MlTasks .getDatafeedState (expandedDatafeedId , tasks ),
79- startedDatafeedIds , stoppingDatafeedIds );
83+ startedDatafeedIds , stoppingDatafeedIds , notStoppedDatafeedIds );
8084 }
8185 }
8286
8387 private static void addDatafeedTaskIdAccordingToState (String datafeedId ,
8488 DatafeedState datafeedState ,
8589 List <String > startedDatafeedIds ,
86- List <String > stoppingDatafeedIds ) {
90+ List <String > stoppingDatafeedIds ,
91+ List <String > notStoppedDatafeedIds ) {
8792 switch (datafeedState ) {
93+ case STARTING :
94+ // The STARTING state is not used anywhere at present, so this should never happen.
95+ // At present datafeeds that have a persistent task that hasn't yet been assigned
96+ // a state are reported as STOPPED (which is not great). It could be considered a
97+ // breaking change to introduce the STARTING state though, so let's aim to do it in
98+ // version 8. Also consider treating STARTING like STARTED for stop API behaviour.
99+ notStoppedDatafeedIds .add (datafeedId );
100+ break ;
88101 case STARTED :
89102 startedDatafeedIds .add (datafeedId );
103+ notStoppedDatafeedIds .add (datafeedId );
90104 break ;
91105 case STOPPED :
92106 break ;
93107 case STOPPING :
94108 stoppingDatafeedIds .add (datafeedId );
109+ notStoppedDatafeedIds .add (datafeedId );
95110 break ;
96111 default :
112+ assert false : "Unexpected datafeed state " + datafeedState ;
97113 break ;
98114 }
99115 }
@@ -118,17 +134,18 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
118134
119135 List <String > startedDatafeeds = new ArrayList <>();
120136 List <String > stoppingDatafeeds = new ArrayList <>();
121- sortDatafeedIdsByTaskState (expandedIds , tasks , startedDatafeeds , stoppingDatafeeds );
137+ List <String > notStoppedDatafeeds = new ArrayList <>();
138+ sortDatafeedIdsByTaskState (expandedIds , tasks , startedDatafeeds , stoppingDatafeeds , notStoppedDatafeeds );
122139 if (startedDatafeeds .isEmpty () && stoppingDatafeeds .isEmpty ()) {
123140 listener .onResponse (new StopDatafeedAction .Response (true ));
124141 return ;
125142 }
126143 request .setResolvedStartedDatafeedIds (startedDatafeeds .toArray (new String [startedDatafeeds .size ()]));
127144
128145 if (request .isForce ()) {
129- forceStopDatafeed (request , listener , tasks , startedDatafeeds );
146+ forceStopDatafeed (request , listener , tasks , notStoppedDatafeeds );
130147 } else {
131- normalStopDatafeed (task , request , listener , tasks , startedDatafeeds , stoppingDatafeeds );
148+ normalStopDatafeed (task , request , listener , tasks , nodes , startedDatafeeds , stoppingDatafeeds );
132149 }
133150 },
134151 listener ::onFailure
@@ -137,20 +154,20 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
137154 }
138155
139156 private void normalStopDatafeed (Task task , StopDatafeedAction .Request request , ActionListener <StopDatafeedAction .Response > listener ,
140- PersistentTasksCustomMetaData tasks ,
157+ PersistentTasksCustomMetaData tasks , DiscoveryNodes nodes ,
141158 List <String > startedDatafeeds , List <String > stoppingDatafeeds ) {
142- Set <String > executorNodes = new HashSet <>();
159+ final Set <String > executorNodes = new HashSet <>();
143160 for (String datafeedId : startedDatafeeds ) {
144161 PersistentTasksCustomMetaData .PersistentTask <?> datafeedTask = MlTasks .getDatafeedTask (datafeedId , tasks );
145162 if (datafeedTask == null ) {
146163 // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
147164 String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found." ;
148165 assert datafeedTask != null : msg ;
149166 logger .error (msg );
150- } else if (datafeedTask .isAssigned () ) {
167+ } else if (PersistentTasksClusterService . needsReassignment ( datafeedTask .getAssignment (), nodes ) == false ) {
151168 executorNodes .add (datafeedTask .getExecutorNode ());
152169 } else {
153- // This is the easy case - the datafeed is not currently assigned to a node,
170+ // This is the easy case - the datafeed is not currently assigned to a valid node,
154171 // so can be gracefully stopped simply by removing its persistent task. (Usually
155172 // a graceful stop cannot be achieved by simply removing the persistent task, but
156173 // if the datafeed has no running code then graceful/forceful are the same.)
@@ -171,48 +188,62 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A
171188
172189 ActionListener <StopDatafeedAction .Response > finalListener = ActionListener .wrap (
173190 r -> waitForDatafeedStopped (allDataFeedsToWaitFor , request , r , listener ),
174- listener ::onFailure );
191+ e -> {
192+ if (ExceptionsHelper .unwrapCause (e ) instanceof FailedNodeException ) {
193+ // A node has dropped out of the cluster since we started executing the requests.
194+ // Since stopping an already stopped datafeed is not an error we can try again.
195+ // The datafeeds that were running on the node that dropped out of the cluster
196+ // will just have their persistent tasks cancelled. Datafeeds that were stopped
197+ // by the previous attempt will be noops in the subsequent attempt.
198+ doExecute (task , request , listener );
199+ } else {
200+ listener .onFailure (e );
201+ }
202+ });
175203
176204 super .doExecute (task , request , finalListener );
177205 }
178206
179207 private void forceStopDatafeed (final StopDatafeedAction .Request request , final ActionListener <StopDatafeedAction .Response > listener ,
180- PersistentTasksCustomMetaData tasks , final List <String > startedDatafeeds ) {
208+ PersistentTasksCustomMetaData tasks , final List <String > notStoppedDatafeeds ) {
181209 final AtomicInteger counter = new AtomicInteger ();
182- final AtomicArray <Exception > failures = new AtomicArray <>(startedDatafeeds .size ());
210+ final AtomicArray <Exception > failures = new AtomicArray <>(notStoppedDatafeeds .size ());
183211
184- for (String datafeedId : startedDatafeeds ) {
212+ for (String datafeedId : notStoppedDatafeeds ) {
185213 PersistentTasksCustomMetaData .PersistentTask <?> datafeedTask = MlTasks .getDatafeedTask (datafeedId , tasks );
186214 if (datafeedTask != null ) {
187215 persistentTasksService .sendRemoveRequest (datafeedTask .getId (),
188216 new ActionListener <PersistentTasksCustomMetaData .PersistentTask <?>>() {
189217 @ Override
190218 public void onResponse (PersistentTasksCustomMetaData .PersistentTask <?> persistentTask ) {
191- if (counter .incrementAndGet () == startedDatafeeds .size ()) {
219+ if (counter .incrementAndGet () == notStoppedDatafeeds .size ()) {
192220 sendResponseOrFailure (request .getDatafeedId (), listener , failures );
193221 }
194222 }
195223
196224 @ Override
197225 public void onFailure (Exception e ) {
198226 final int slot = counter .incrementAndGet ();
199- if ((ExceptionsHelper .unwrapCause (e ) instanceof ResourceNotFoundException &&
200- Strings .isAllOrWildcard (new String []{request .getDatafeedId ()})) == false ) {
227+ // We validated that the datafeed names supplied in the request existed when we started processing the action.
228+ // If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
229+ // This is not an error.
230+ if (ExceptionsHelper .unwrapCause (e ) instanceof ResourceNotFoundException == false ) {
201231 failures .set (slot - 1 , e );
202232 }
203- if (slot == startedDatafeeds .size ()) {
233+ if (slot == notStoppedDatafeeds .size ()) {
204234 sendResponseOrFailure (request .getDatafeedId (), listener , failures );
205235 }
206236 }
207237 });
208238 } else {
209- // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
239+ // This should not happen, because startedDatafeeds and stoppingDatafeeds
240+ // were derived from the same tasks that were passed to this method
210241 String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found." ;
211242 assert datafeedTask != null : msg ;
212243 logger .error (msg );
213244 final int slot = counter .incrementAndGet ();
214245 failures .set (slot - 1 , new RuntimeException (msg ));
215- if (slot == startedDatafeeds .size ()) {
246+ if (slot == notStoppedDatafeeds .size ()) {
216247 sendResponseOrFailure (request .getDatafeedId (), listener , failures );
217248 }
218249 }
@@ -313,7 +344,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
313344 .convertToElastic (failedNodeExceptions .get (0 ));
314345 } else {
315346 // This can happen we the actual task in the node no longer exists,
316- // which means the datafeed(s) have already been closed .
347+ // which means the datafeed(s) have already been stopped .
317348 return new StopDatafeedAction .Response (true );
318349 }
319350 }
0 commit comments