|
5 | 5 | */ |
6 | 6 | package org.elasticsearch.xpack.watcher.execution; |
7 | 7 |
|
8 | | -import org.elasticsearch.ElasticsearchException; |
9 | 8 | import org.elasticsearch.Version; |
| 9 | +import org.elasticsearch.action.ActionFuture; |
10 | 10 | import org.elasticsearch.action.ActionListener; |
| 11 | +import org.elasticsearch.action.delete.DeleteRequest; |
11 | 12 | import org.elasticsearch.action.get.GetRequest; |
12 | 13 | import org.elasticsearch.action.get.GetResponse; |
| 14 | +import org.elasticsearch.action.index.IndexRequest; |
13 | 15 | import org.elasticsearch.action.support.PlainActionFuture; |
14 | 16 | import org.elasticsearch.action.update.UpdateRequest; |
15 | 17 | import org.elasticsearch.action.update.UpdateResponse; |
|
28 | 30 | import org.elasticsearch.common.xcontent.XContentFactory; |
29 | 31 | import org.elasticsearch.common.xcontent.XContentParser; |
30 | 32 | import org.elasticsearch.common.xcontent.XContentType; |
| 33 | +import org.elasticsearch.index.Index; |
31 | 34 | import org.elasticsearch.index.IndexNotFoundException; |
| 35 | +import org.elasticsearch.index.engine.VersionConflictEngineException; |
32 | 36 | import org.elasticsearch.index.get.GetResult; |
| 37 | +import org.elasticsearch.index.shard.ShardId; |
33 | 38 | import org.elasticsearch.test.ESTestCase; |
34 | 39 | import org.elasticsearch.threadpool.ThreadPool; |
35 | 40 | import org.elasticsearch.xpack.core.security.authc.Authentication; |
|
48 | 53 | import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase; |
49 | 54 | import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; |
50 | 55 | import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch; |
| 56 | +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; |
51 | 57 | import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; |
52 | 58 | import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; |
53 | 59 | import org.elasticsearch.xpack.core.watcher.execution.Wid; |
|
91 | 97 | import static java.util.Collections.singletonMap; |
92 | 98 | import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; |
93 | 99 | import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; |
| 100 | +import static org.hamcrest.Matchers.containsString; |
94 | 101 | import static org.hamcrest.Matchers.equalTo; |
95 | 102 | import static org.hamcrest.Matchers.greaterThan; |
96 | 103 | import static org.hamcrest.Matchers.hasSize; |
@@ -844,22 +851,74 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc |
844 | 851 | when(getResponse.isExists()).thenReturn(true); |
845 | 852 | when(getResponse.getId()).thenReturn("foo"); |
846 | 853 | mockGetWatchResponse(client, "foo", getResponse); |
| 854 | + ActionFuture actionFuture = mock(ActionFuture.class); |
| 855 | + when(actionFuture.get()).thenReturn(""); |
| 856 | + when(client.index(any())).thenReturn(actionFuture); |
| 857 | + when(client.delete(any())).thenReturn(actionFuture); |
| 858 | + |
847 | 859 | when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); |
848 | 860 |
|
849 | | - // execute needs to fail as well as storing the history |
| 861 | + // execute needs to fail |
850 | 862 | doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); |
851 | | - doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); |
852 | 863 |
|
853 | 864 | Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC)); |
854 | 865 |
|
855 | 866 | TriggeredWatch triggeredWatch = new TriggeredWatch(wid, |
856 | 867 | new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC) ,ZonedDateTime.now(ZoneOffset.UTC))); |
857 | 868 | executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); |
858 | 869 |
|
859 | | - verify(triggeredWatchStore, times(1)).delete(wid); |
860 | | - ArgumentCaptor<WatchRecord> captor = ArgumentCaptor.forClass(WatchRecord.class); |
861 | | - verify(historyStore, times(1)).forcePut(captor.capture()); |
862 | | - assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION)); |
| 870 | + ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); |
| 871 | + verify(client).delete(deleteCaptor.capture()); |
| 872 | + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); |
| 873 | + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); |
| 874 | + |
| 875 | + ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); |
| 876 | + verify(client).index(watchHistoryCaptor.capture()); |
| 877 | + |
| 878 | + assertThat(watchHistoryCaptor.getValue().source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); |
| 879 | + assertThat(watchHistoryCaptor.getValue().index(), containsString(".watcher-history")); |
| 880 | + } |
| 881 | + |
| 882 | + public void testForcePutHistoryOnExecutionRejection() throws Exception { |
| 883 | + Watch watch = mock(Watch.class); |
| 884 | + when(watch.id()).thenReturn("foo"); |
| 885 | + WatchStatus status = new WatchStatus(ZonedDateTime.now(ZoneOffset.UTC), Collections.emptyMap()); |
| 886 | + when(watch.status()).thenReturn(status); |
| 887 | + GetResponse getResponse = mock(GetResponse.class); |
| 888 | + when(getResponse.isExists()).thenReturn(true); |
| 889 | + when(getResponse.getId()).thenReturn("foo"); |
| 890 | + mockGetWatchResponse(client, "foo", getResponse); |
| 891 | + ActionFuture actionFuture = mock(ActionFuture.class); |
| 892 | + when(actionFuture.get()).thenReturn(""); |
| 893 | + when(client.index(any())) |
| 894 | + .thenThrow(new VersionConflictEngineException( |
| 895 | + new ShardId(new Index("mockindex", "mockuuid"), 0), "id", "explaination")) |
| 896 | + .thenReturn(actionFuture); |
| 897 | + when(client.delete(any())).thenReturn(actionFuture); |
| 898 | + |
| 899 | + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); |
| 900 | + |
| 901 | + // execute needs to fail |
| 902 | + doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); |
| 903 | + |
| 904 | + Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC)); |
| 905 | + |
| 906 | + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, |
| 907 | + new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC))); |
| 908 | + executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); |
| 909 | + |
| 910 | + ArgumentCaptor<DeleteRequest> deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); |
| 911 | + verify(client).delete(deleteCaptor.capture()); |
| 912 | + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); |
| 913 | + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); |
| 914 | + |
| 915 | + ArgumentCaptor<IndexRequest> watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); |
| 916 | + verify(client, times(2)).index(watchHistoryCaptor.capture()); |
| 917 | + List<IndexRequest> indexRequests = watchHistoryCaptor.getAllValues(); |
| 918 | + |
| 919 | + assertThat(indexRequests.get(0).id(), equalTo(indexRequests.get(1).id())); |
| 920 | + assertThat(indexRequests.get(0).source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); |
| 921 | + assertThat(indexRequests.get(1).source().utf8ToString(), containsString(ExecutionState.EXECUTED_MULTIPLE_TIMES.toString())); |
863 | 922 | } |
864 | 923 |
|
865 | 924 | public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception { |
@@ -898,7 +957,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce |
898 | 957 | when(watch.status()).thenReturn(watchStatus); |
899 | 958 |
|
900 | 959 | executionService.execute(context); |
901 | | - verify(triggeredWatchStore, never()).delete(any()); |
| 960 | + verify(client, never()).delete(any()); |
902 | 961 | } |
903 | 962 |
|
904 | 963 | public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception { |
|
0 commit comments