|
29 | 29 | import org.elasticsearch.xpack.core.ml.MlTasks; |
30 | 30 | import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; |
31 | 31 | import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; |
32 | | -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; |
33 | 32 | import org.elasticsearch.xpack.ml.MachineLearning; |
34 | 33 | import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; |
35 | 34 |
|
@@ -104,7 +103,7 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi |
104 | 103 | final DiscoveryNodes nodes = state.nodes(); |
105 | 104 | if (nodes.isLocalNodeElectedMaster() == false) { |
106 | 105 | // Delegates stop datafeed to elected master node, so it becomes the coordinating node. |
107 | | - // See comment in StartDatafeedAction.Transport class for more information. |
| 106 | + // See comment in TransportStartDatafeedAction for more information. |
108 | 107 | if (nodes.getMasterNode() == null) { |
109 | 108 | listener.onFailure(new MasterNotDiscoveredException("no known master node")); |
110 | 109 | } else { |
@@ -142,13 +141,21 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A |
142 | 141 | Set<String> executorNodes = new HashSet<>(); |
143 | 142 | for (String datafeedId : startedDatafeeds) { |
144 | 143 | PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); |
145 | | - if (datafeedTask == null || datafeedTask.isAssigned() == false) { |
146 | | - String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." + |
147 | | - " Use force stop to stop the datafeed"; |
148 | | - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); |
149 | | - return; |
150 | | - } else { |
| 144 | + if (datafeedTask == null) { |
| 145 | + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method |
| 146 | + String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found."; |
| 147 | + assert datafeedTask != null : msg; |
| 148 | + logger.error(msg); |
| 149 | + } else if (datafeedTask.isAssigned()) { |
151 | 150 | executorNodes.add(datafeedTask.getExecutorNode()); |
| 151 | + } else { |
| 152 | + // This is the easy case - the datafeed is not currently assigned to a node, |
| 153 | + // so can be gracefully stopped simply by removing its persistent task. (Usually |
| 154 | + // a graceful stop cannot be achieved by simply removing the persistent task, but |
| 155 | + // if the datafeed has no running code then graceful/forceful are the same.) |
| 156 | + // The listener here can be a no-op, as waitForDatafeedStopped() already waits for |
| 157 | + // these persistent tasks to disappear. |
| 158 | + persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {})); |
152 | 159 | } |
153 | 160 | } |
154 | 161 |
|
@@ -198,9 +205,10 @@ public void onFailure(Exception e) { |
198 | 205 | } |
199 | 206 | }); |
200 | 207 | } else { |
201 | | - String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + |
202 | | - "datafeed's task could not be found."; |
203 | | - logger.warn(msg); |
| 208 | + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method |
| 209 | + String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found."; |
| 210 | + assert datafeedTask != null : msg; |
| 211 | + logger.error(msg); |
204 | 212 | final int slot = counter.incrementAndGet(); |
205 | 213 | failures.set(slot - 1, new RuntimeException(msg)); |
206 | 214 | if (slot == startedDatafeeds.size()) { |
@@ -248,19 +256,18 @@ protected void doRun() throws Exception { |
248 | 256 |
|
249 | 257 | private void sendResponseOrFailure(String datafeedId, ActionListener<StopDatafeedAction.Response> listener, |
250 | 258 | AtomicArray<Exception> failures) { |
251 | | - List<Exception> catchedExceptions = failures.asList(); |
252 | | - if (catchedExceptions.size() == 0) { |
| 259 | + List<Exception> caughtExceptions = failures.asList(); |
| 260 | + if (caughtExceptions.size() == 0) { |
253 | 261 | listener.onResponse(new StopDatafeedAction.Response(true)); |
254 | 262 | return; |
255 | 263 | } |
256 | 264 |
|
257 | | - String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() |
| 265 | + String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + caughtExceptions.size() |
258 | 266 | + "] failures, rethrowing last, all Exceptions: [" |
259 | | - + catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) |
| 267 | + + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) |
260 | 268 | + "]"; |
261 | 269 |
|
262 | | - ElasticsearchException e = new ElasticsearchException(msg, |
263 | | - catchedExceptions.get(0)); |
| 270 | + ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0)); |
264 | 271 | listener.onFailure(e); |
265 | 272 | } |
266 | 273 |
|
|
0 commit comments