diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java index 8c056d0dcb8be..98c98ca1b537b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java @@ -25,13 +25,13 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; -import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsRequest; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.core.watcher.watch.WatchField; -import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.transport.actions.WatcherTransportAction; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.joda.time.DateTime; @@ -49,83 +49,86 @@ public class TransportAckWatchAction extends WatcherTransportAction listener) { - // if the watch to be acked is running currently, reject this request - List snapshots = executionService.currentExecutions(); - boolean isWatchRunning = snapshots.stream().anyMatch(s -> s.watchId().equals(request.getWatchId())); - if (isWatchRunning) { - listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished", + WatcherStatsRequest watcherStatsRequest = new WatcherStatsRequest(); + watcherStatsRequest.includeCurrentWatches(true); + + executeAsyncWithOrigin(client, WATCHER_ORIGIN, WatcherStatsAction.INSTANCE, watcherStatsRequest, ActionListener.wrap(response -> { + boolean isWatchRunning = response.getNodes().stream() + .anyMatch(node -> node.getSnapshots().stream().anyMatch(snapshot -> snapshot.watchId().equals(request.getWatchId()))); + if (isWatchRunning) { + listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished", RestStatus.CONFLICT, request.getWatchId())); - return; - } - - GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()) - .preference(Preference.LOCAL.type()).realtime(true); - - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest, - ActionListener.wrap((response) -> { - if (response.isExists() == false) { - listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId())); - } else { - DateTime now = new DateTime(clock.millis(), UTC); - Watch watch = parser.parseWithSecrets(request.getWatchId(), true, response.getSourceAsBytesRef(), + } else { + GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()) + .preference(Preference.LOCAL.type()).realtime(true); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest, + ActionListener.wrap(getResponse -> { + if (getResponse.isExists() == false) { + listener.onFailure(new ResourceNotFoundException("Watch with id [{}] does not exist", request.getWatchId())); + } else { + DateTime now = new DateTime(clock.millis(), UTC); + Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now, XContentType.JSON); - watch.version(response.getVersion()); - watch.status().version(response.getVersion()); - String[] actionIds = request.getActionIds(); - if (actionIds == null || actionIds.length == 0) { - actionIds = new String[]{WatchField.ALL_ACTIONS_ID}; - } + watch.version(getResponse.getVersion()); + watch.status().version(getResponse.getVersion()); + String[] actionIds = request.getActionIds(); + if (actionIds == null || actionIds.length == 0) { + actionIds = new String[]{WatchField.ALL_ACTIONS_ID}; + } - // exit early in case nothing changes - boolean isChanged = watch.ack(now, actionIds); - if (isChanged == false) { - listener.onResponse(new AckWatchResponse(watch.status())); - return; - } + // exit early in case nothing changes + boolean isChanged = watch.ack(now, actionIds); + if (isChanged == false) { + listener.onResponse(new AckWatchResponse(watch.status())); + return; + } - UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()); - // this may reject this action, but prevents concurrent updates from a watch execution - updateRequest.version(response.getVersion()); - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - XContentBuilder builder = jsonBuilder(); - builder.startObject() + UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId()); + // this may reject this action, but prevents concurrent updates from a watch execution + updateRequest.version(getResponse.getVersion()); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + XContentBuilder builder = jsonBuilder(); + builder.startObject() .startObject(WatchField.STATUS.getPreferredName()) .startObject("actions"); - List actionIdsAsList = Arrays.asList(actionIds); - boolean updateAll = actionIdsAsList.contains("_all"); - for (ActionWrapper actionWrapper : watch.actions()) { - if (updateAll || actionIdsAsList.contains(actionWrapper.id())) { - builder.startObject(actionWrapper.id()) + List actionIdsAsList = Arrays.asList(actionIds); + boolean updateAll = actionIdsAsList.contains("_all"); + for (ActionWrapper actionWrapper : watch.actions()) { + if (updateAll || actionIdsAsList.contains(actionWrapper.id())) { + builder.startObject(actionWrapper.id()) .field("ack", watch.status().actionStatus(actionWrapper.id()).ackStatus(), ToXContent.EMPTY_PARAMS) .endObject(); + } } - } - builder.endObject().endObject().endObject(); - updateRequest.doc(builder); + builder.endObject().endObject().endObject(); + updateRequest.doc(builder); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, ActionListener.wrap( - (updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())), - listener::onFailure), client::update); - } - }, listener::onFailure), client::get); + (updateResponse) -> listener.onResponse(new AckWatchResponse(watch.status())), + listener::onFailure), client::update); + } + }, listener::onFailure), client::get); + + } + + }, listener::onFailure)); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java index ea4d70b95c2e3..0121d07616065 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java @@ -6,11 +6,15 @@ package org.elasticsearch.xpack.watcher.transport.actions.ack; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -20,11 +24,13 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchRequest; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchResponse; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.xpack.core.watcher.watch.Watch; -import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.watch.WatchParser; import org.junit.Before; @@ -34,6 +40,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -41,7 +48,6 @@ public class TransportAckWatchActionTests extends ESTestCase { private TransportAckWatchAction action; - private ExecutionService executionService; private Client client; @Before @@ -51,11 +57,10 @@ public void setupAction() { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); when(threadPool.getThreadContext()).thenReturn(threadContext); WatchParser watchParser = mock(WatchParser.class); - executionService = mock(ExecutionService.class); client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); action = new TransportAckWatchAction(Settings.EMPTY, transportService, new ActionFilters(Collections.emptySet()), - Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, executionService, client); + Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client); } public void testWatchNotFound() { @@ -67,6 +72,13 @@ public void testWatchNotFound() { return null; }).when(client).get(anyObject(), anyObject()); + doAnswer(invocation -> { + ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2]; + listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"), new WatcherMetaData(false), + Collections.emptyList(), Collections.emptyList())); + return null; + }).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject()); + AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId); PlainActionFuture listener = PlainActionFuture.newFuture(); action.doExecute(ackWatchRequest, listener); @@ -78,9 +90,18 @@ public void testWatchNotFound() { public void testThatWatchCannotBeAckedWhileRunning() { String watchId = "my_watch_id"; - WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class); - when(snapshot.watchId()).thenReturn(watchId); - when(executionService.currentExecutions()).thenReturn(Collections.singletonList(snapshot)); + + doAnswer(invocation -> { + ContextPreservingActionListener listener = (ContextPreservingActionListener) invocation.getArguments()[2]; + DiscoveryNode discoveryNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + WatcherStatsResponse.Node node = new WatcherStatsResponse.Node(discoveryNode); + WatchExecutionSnapshot snapshot = mock(WatchExecutionSnapshot.class); + when(snapshot.watchId()).thenReturn(watchId); + node.setSnapshots(Collections.singletonList(snapshot)); + listener.onResponse(new WatcherStatsResponse(new ClusterName("clusterName"), + new WatcherMetaData(false), Collections.singletonList(node), Collections.emptyList())); + return null; + }).when(client).execute(eq(WatcherStatsAction.INSTANCE), anyObject(), anyObject()); AckWatchRequest ackWatchRequest = new AckWatchRequest(watchId); PlainActionFuture listener = PlainActionFuture.newFuture(); @@ -91,4 +112,4 @@ public void testThatWatchCannotBeAckedWhileRunning() { assertThat(e.getMessage(), is("watch[my_watch_id] is running currently, cannot ack until finished")); assertThat(e.status(), is(RestStatus.CONFLICT)); } -} \ No newline at end of file +}