|
34 | 34 | import java.util.HashMap; |
35 | 35 | import java.util.List; |
36 | 36 | import java.util.Map; |
| 37 | +import java.util.function.Function; |
37 | 38 | import java.util.stream.Collectors; |
38 | 39 |
|
39 | 40 | import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; |
@@ -177,14 +178,14 @@ public void testWatcherStats() throws Exception { |
177 | 178 | } |
178 | 179 |
|
179 | 180 | public void testWatcherRestart() throws Exception { |
180 | | - assumeFalse("Seems to be broken in mixed clusters. Skipping while I debug.", CLUSTER_TYPE == ClusterType.MIXED); |
181 | 181 | executeUpgradeIfNeeded(); |
182 | 182 |
|
183 | 183 | executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_stop"))); |
184 | 184 | ensureWatcherStopped(); |
185 | 185 |
|
186 | 186 | executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_start"))); |
187 | | - ensureWatcherStarted(); |
| 187 | + // Watcher should be started on at least the nodes with the new version. |
| 188 | + ensureWatcherStartedOnModernNodes(); |
188 | 189 | } |
189 | 190 |
|
190 | 191 | public void testWatchCrudApis() throws Exception { |
@@ -314,6 +315,30 @@ private void ensureWatcherStarted() throws Exception { |
314 | 315 | })); |
315 | 316 | } |
316 | 317 |
|
| 318 | + private void ensureWatcherStartedOnModernNodes() throws Exception { |
| 319 | + if (nodes.getMaster().getVersion().before(Version.V_6_0_0)) { |
| 320 | + /* |
| 321 | + * Versions before 6.0 ran watcher on the master node and the |
| 322 | + * logic in ensureWatcherStarted is fine. |
| 323 | + */ |
| 324 | + ensureWatcherStarted(); |
| 325 | + return; |
| 326 | + } |
| 327 | + executeAgainstMasterNode(client -> assertBusy(() -> { |
| 328 | + Map<?, ?> responseBody = entityAsMap(client.performRequest("GET", "_xpack/watcher/stats")); |
| 329 | + logger.info("ensureWatcherStartedOnModernNodes(), stats response [{}]", responseBody); |
| 330 | + Map<?, ?> stats = ((List<?>) responseBody.get("stats")).stream() |
| 331 | + .map(o -> (Map<?, ?>) o) |
| 332 | + .collect(Collectors.toMap(m -> m.get("node_id"), Function.identity())); |
| 333 | + assertNotNull("no stats yet", stats); |
| 334 | + for (Node node : nodes.getNewNodes()) { |
| 335 | + Map<?, ?> nodeStats = (Map<?, ?>) stats.get(node.getId()); |
| 336 | + assertEquals("modern node [" + node.getId() + "] is not started", |
| 337 | + nodeStats.get("watcher_state"), "started"); |
| 338 | + } |
| 339 | + })); |
| 340 | + } |
| 341 | + |
317 | 342 | private Nodes buildNodeAndVersions() throws IOException { |
318 | 343 | Response response = client().performRequest("GET", "_nodes"); |
319 | 344 | ObjectPath objectPath = ObjectPath.createFromResponse(response); |
|
0 commit comments