66package org .elasticsearch .xpack .watcher ;
77
88import org .elasticsearch .Version ;
9+ import org .elasticsearch .action .ActionListener ;
10+ import org .elasticsearch .action .admin .indices .refresh .RefreshAction ;
911import org .elasticsearch .action .admin .indices .refresh .RefreshRequest ;
1012import org .elasticsearch .action .admin .indices .refresh .RefreshResponse ;
13+ import org .elasticsearch .action .search .ClearScrollAction ;
1114import org .elasticsearch .action .search .ClearScrollRequest ;
1215import org .elasticsearch .action .search .ClearScrollResponse ;
16+ import org .elasticsearch .action .search .SearchAction ;
1317import org .elasticsearch .action .search .SearchRequest ;
1418import org .elasticsearch .action .search .SearchResponse ;
1519import org .elasticsearch .action .search .SearchResponseSections ;
20+ import org .elasticsearch .action .search .SearchScrollAction ;
1621import org .elasticsearch .action .search .SearchScrollRequest ;
1722import org .elasticsearch .action .search .ShardSearchFailure ;
18- import org .elasticsearch .action .support .PlainActionFuture ;
19- import org .elasticsearch .client .AdminClient ;
2023import org .elasticsearch .client .Client ;
21- import org .elasticsearch .client .IndicesAdminClient ;
2224import org .elasticsearch .cluster .ClusterName ;
2325import org .elasticsearch .cluster .ClusterState ;
2426import org .elasticsearch .cluster .metadata .IndexMetaData ;
4244import org .elasticsearch .search .SearchShardTarget ;
4345import org .elasticsearch .test .ESTestCase ;
4446import org .elasticsearch .threadpool .ThreadPool ;
45- import org .elasticsearch .xpack .core .XPackSettings ;
4647import org .elasticsearch .xpack .core .watcher .trigger .Trigger ;
4748import org .elasticsearch .xpack .core .watcher .watch .Watch ;
4849import org .elasticsearch .xpack .core .watcher .watch .WatchStatus ;
5556import org .elasticsearch .xpack .watcher .watch .WatchParser ;
5657import org .joda .time .DateTime ;
5758import org .joda .time .DateTimeZone ;
59+ import org .junit .Before ;
5860import org .mockito .ArgumentCaptor ;
5961
6062import java .util .Collections ;
6769import static org .hamcrest .Matchers .is ;
6870import static org .mockito .Matchers .any ;
6971import static org .mockito .Matchers .eq ;
72+ import static org .mockito .Mockito .doAnswer ;
7073import static org .mockito .Mockito .mock ;
7174import static org .mockito .Mockito .never ;
7275import static org .mockito .Mockito .verify ;
@@ -76,14 +79,24 @@ public class WatcherServiceTests extends ESTestCase {
7679
7780 private final ExecutorService executorService = EsExecutors .newDirectExecutorService ();
7881
82+ private final Client client = mock (Client .class );
83+
84+ @ Before
85+ public void configureMockClient () {
86+ when (client .settings ()).thenReturn (Settings .EMPTY );
87+ ThreadPool threadPool = mock (ThreadPool .class );
88+ when (client .threadPool ()).thenReturn (threadPool );
89+ when (threadPool .getThreadContext ()).thenReturn (new ThreadContext (Settings .EMPTY ));
90+ }
91+
7992 public void testValidateStartWithClosedIndex () {
8093 TriggerService triggerService = mock (TriggerService .class );
8194 TriggeredWatchStore triggeredWatchStore = mock (TriggeredWatchStore .class );
8295 ExecutionService executionService = mock (ExecutionService .class );
8396 WatchParser parser = mock (WatchParser .class );
8497
8598 WatcherService service = new WatcherService (Settings .EMPTY , triggerService , triggeredWatchStore ,
86- executionService , parser , mock ( Client . class ) , executorService ) {
99+ executionService , parser , client , executorService ) {
87100 @ Override
88101 void stopExecutor () {
89102 }
@@ -102,18 +115,11 @@ void stopExecutor() {
102115 }
103116
104117 public void testLoadOnlyActiveWatches () throws Exception {
105- // this is just, so we dont have to add any mocking to the threadpool
106- Settings settings = Settings .builder ().put (XPackSettings .SECURITY_ENABLED .getKey (), false ).build ();
107-
108118 TriggerService triggerService = mock (TriggerService .class );
109119 TriggeredWatchStore triggeredWatchStore = mock (TriggeredWatchStore .class );
110120 ExecutionService executionService = mock (ExecutionService .class );
111121 WatchParser parser = mock (WatchParser .class );
112- Client client = mock (Client .class );
113- ThreadPool threadPool = mock (ThreadPool .class );
114- when (client .threadPool ()).thenReturn (threadPool );
115- when (threadPool .getThreadContext ()).thenReturn (new ThreadContext (Settings .EMPTY ));
116- WatcherService service = new WatcherService (settings , triggerService , triggeredWatchStore ,
122+ WatcherService service = new WatcherService (Settings .EMPTY , triggerService , triggeredWatchStore ,
117123 executionService , parser , client , executorService ) {
118124 @ Override
119125 void stopExecutor () {
@@ -150,21 +156,21 @@ void stopExecutor() {
150156 RefreshResponse refreshResponse = mock (RefreshResponse .class );
151157 when (refreshResponse .getSuccessfulShards ())
152158 .thenReturn (clusterState .getMetaData ().getIndices ().get (Watch .INDEX ).getNumberOfShards ());
153- AdminClient adminClient = mock (AdminClient .class );
154- IndicesAdminClient indicesAdminClient = mock (IndicesAdminClient .class );
155- when (client .admin ()).thenReturn (adminClient );
156- when (adminClient .indices ()).thenReturn (indicesAdminClient );
157- PlainActionFuture <RefreshResponse > refreshFuture = new PlainActionFuture <>();
158- when (indicesAdminClient .refresh (any (RefreshRequest .class ))).thenReturn (refreshFuture );
159- refreshFuture .onResponse (refreshResponse );
159+ doAnswer (invocation -> {
160+ ActionListener <RefreshResponse > listener = (ActionListener <RefreshResponse >) invocation .getArguments ()[2 ];
161+ listener .onResponse (refreshResponse );
162+ return null ;
163+ }).when (client ).execute (eq (RefreshAction .INSTANCE ), any (RefreshRequest .class ), any (ActionListener .class ));
160164
161165 // empty scroll response, no further scrolling needed
162166 SearchResponseSections scrollSearchSections = new SearchResponseSections (SearchHits .empty (), null , null , false , false , null , 1 );
163167 SearchResponse scrollSearchResponse = new SearchResponse (scrollSearchSections , "scrollId" , 1 , 1 , 0 , 10 ,
164168 ShardSearchFailure .EMPTY_ARRAY , SearchResponse .Clusters .EMPTY );
165- PlainActionFuture <SearchResponse > searchScrollResponseFuture = new PlainActionFuture <>();
166- when (client .searchScroll (any (SearchScrollRequest .class ))).thenReturn (searchScrollResponseFuture );
167- searchScrollResponseFuture .onResponse (scrollSearchResponse );
169+ doAnswer (invocation -> {
170+ ActionListener <SearchResponse > listener = (ActionListener <SearchResponse >) invocation .getArguments ()[2 ];
171+ listener .onResponse (scrollSearchResponse );
172+ return null ;
173+ }).when (client ).execute (eq (SearchScrollAction .INSTANCE ), any (SearchScrollRequest .class ), any (ActionListener .class ));
168174
169175 // one search response containing active and inactive watches
170176 int count = randomIntBetween (2 , 200 );
@@ -192,13 +198,17 @@ void stopExecutor() {
192198 SearchResponseSections sections = new SearchResponseSections (searchHits , null , null , false , false , null , 1 );
193199 SearchResponse searchResponse = new SearchResponse (sections , "scrollId" , 1 , 1 , 0 , 10 , ShardSearchFailure .EMPTY_ARRAY ,
194200 SearchResponse .Clusters .EMPTY );
195- PlainActionFuture <SearchResponse > searchResponseFuture = new PlainActionFuture <>();
196- when (client .search (any (SearchRequest .class ))).thenReturn (searchResponseFuture );
197- searchResponseFuture .onResponse (searchResponse );
198-
199- PlainActionFuture <ClearScrollResponse > clearScrollFuture = new PlainActionFuture <>();
200- when (client .clearScroll (any (ClearScrollRequest .class ))).thenReturn (clearScrollFuture );
201- clearScrollFuture .onResponse (new ClearScrollResponse (true , 1 ));
201+ doAnswer (invocation -> {
202+ ActionListener <SearchResponse > listener = (ActionListener <SearchResponse >) invocation .getArguments ()[2 ];
203+ listener .onResponse (searchResponse );
204+ return null ;
205+ }).when (client ).execute (eq (SearchAction .INSTANCE ), any (SearchRequest .class ), any (ActionListener .class ));
206+
207+ doAnswer (invocation -> {
208+ ActionListener <ClearScrollResponse > listener = (ActionListener <ClearScrollResponse >) invocation .getArguments ()[2 ];
209+ listener .onResponse (new ClearScrollResponse (true , 1 ));
210+ return null ;
211+ }).when (client ).execute (eq (ClearScrollAction .INSTANCE ), any (ClearScrollRequest .class ), any (ActionListener .class ));
202212
203213 service .start (clusterState , () -> {});
204214
@@ -228,7 +238,7 @@ public void testPausingWatcherServiceAlsoPausesTriggerService() {
228238 assertThat (triggerService .count (), is (1L ));
229239
230240 WatcherService service = new WatcherService (Settings .EMPTY , triggerService , mock (TriggeredWatchStore .class ),
231- mock (ExecutionService .class ), mock (WatchParser .class ), mock ( Client . class ) , executorService ) {
241+ mock (ExecutionService .class ), mock (WatchParser .class ), client , executorService ) {
232242 @ Override
233243 void stopExecutor () {
234244 }
@@ -245,7 +255,7 @@ public void testReloadingWatcherDoesNotPauseExecutionService() {
245255 ExecutionService executionService = mock (ExecutionService .class );
246256 TriggerService triggerService = mock (TriggerService .class );
247257 WatcherService service = new WatcherService (Settings .EMPTY , triggerService , mock (TriggeredWatchStore .class ),
248- executionService , mock (WatchParser .class ), mock ( Client . class ) , executorService ) {
258+ executionService , mock (WatchParser .class ), client , executorService ) {
249259 @ Override
250260 void stopExecutor () {
251261 }
0 commit comments