1414import org .elasticsearch .cluster .metadata .MetaData ;
1515import org .elasticsearch .cluster .service .ClusterService ;
1616import org .elasticsearch .common .bytes .BytesReference ;
17+ import org .elasticsearch .common .settings .Settings ;
1718import org .elasticsearch .common .xcontent .ToXContent ;
1819import org .elasticsearch .common .xcontent .XContentBuilder ;
1920import org .elasticsearch .common .xcontent .json .JsonXContent ;
2021import org .elasticsearch .mock .orig .Mockito ;
2122import org .elasticsearch .search .SearchHit ;
2223import org .elasticsearch .search .SearchHits ;
2324import org .elasticsearch .test .ESTestCase ;
25+ import org .elasticsearch .threadpool .FixedExecutorBuilder ;
26+ import org .elasticsearch .threadpool .ThreadPool ;
2427import org .elasticsearch .xpack .core .ml .MLMetadataField ;
2528import org .elasticsearch .xpack .core .ml .MlMetadata ;
2629import org .elasticsearch .xpack .core .ml .action .DeleteModelSnapshotAction ;
2730import org .elasticsearch .xpack .core .ml .job .config .Job ;
2831import org .elasticsearch .xpack .core .ml .job .config .JobTests ;
2932import org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
3033import org .elasticsearch .xpack .core .ml .job .process .autodetect .state .ModelSnapshot ;
34+ import org .elasticsearch .xpack .ml .MachineLearning ;
35+ import org .junit .After ;
3136import org .junit .Before ;
3237import org .mockito .invocation .InvocationOnMock ;
3338import org .mockito .stubbing .Answer ;
3843import java .util .HashMap ;
3944import java .util .List ;
4045import java .util .Map ;
46+ import java .util .concurrent .CountDownLatch ;
47+ import java .util .concurrent .TimeUnit ;
4148
4249import static org .hamcrest .Matchers .equalTo ;
50+ import static org .hamcrest .Matchers .is ;
4351import static org .mockito .Matchers .any ;
4452import static org .mockito .Matchers .same ;
4553import static org .mockito .Mockito .doAnswer ;
4654import static org .mockito .Mockito .mock ;
47- import static org .mockito .Mockito .verify ;
4855import static org .mockito .Mockito .when ;
4956
5057public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
5158
5259 private Client client ;
60+ private ThreadPool threadPool ;
5361 private ClusterService clusterService ;
5462 private ClusterState clusterState ;
5563 private List <SearchRequest > capturedSearchRequests ;
5664 private List <DeleteModelSnapshotAction .Request > capturedDeleteModelSnapshotRequests ;
5765 private List <SearchResponse > searchResponsesPerCall ;
58- private ActionListener < Boolean > listener ;
66+ private TestListener listener ;
5967
6068 @ Before
6169 public void setUpTests () {
@@ -66,7 +74,19 @@ public void setUpTests() {
6674 clusterState = mock (ClusterState .class );
6775 when (clusterService .state ()).thenReturn (clusterState );
6876 client = mock (Client .class );
69- listener = mock (ActionListener .class );
77+ listener = new TestListener ();
78+
79+ // Init thread pool
80+ Settings settings = Settings .builder ()
81+ .put ("node.name" , "expired_model_snapshots_remover_test" )
82+ .build ();
83+ threadPool = new ThreadPool (settings ,
84+ new FixedExecutorBuilder (settings , MachineLearning .UTILITY_THREAD_POOL_NAME , 1 , 1000 , "" ));
85+ }
86+
87+ @ After
88+ public void shutdownThreadPool () throws InterruptedException {
89+ terminate (threadPool );
7090 }
7191
7292 public void testRemove_GivenJobsWithoutRetentionPolicy () {
@@ -78,7 +98,8 @@ public void testRemove_GivenJobsWithoutRetentionPolicy() {
7898
7999 createExpiredModelSnapshotsRemover ().remove (listener );
80100
81- verify (listener ).onResponse (true );
101+ listener .waitToCompletion ();
102+ assertThat (listener .success , is (true ));
82103 Mockito .verifyNoMoreInteractions (client );
83104 }
84105
@@ -88,7 +109,8 @@ public void testRemove_GivenJobWithoutActiveSnapshot() {
88109
89110 createExpiredModelSnapshotsRemover ().remove (listener );
90111
91- verify (listener ).onResponse (true );
112+ listener .waitToCompletion ();
113+ assertThat (listener .success , is (true ));
92114 Mockito .verifyNoMoreInteractions (client );
93115 }
94116
@@ -108,6 +130,9 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
108130
109131 createExpiredModelSnapshotsRemover ().remove (listener );
110132
133+ listener .waitToCompletion ();
134+ assertThat (listener .success , is (true ));
135+
111136 assertThat (capturedSearchRequests .size (), equalTo (2 ));
112137 SearchRequest searchRequest = capturedSearchRequests .get (0 );
113138 assertThat (searchRequest .indices (), equalTo (new String [] {AnomalyDetectorsIndex .jobResultsAliasedName ("snapshots-1" )}));
@@ -124,8 +149,6 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
124149 deleteSnapshotRequest = capturedDeleteModelSnapshotRequests .get (2 );
125150 assertThat (deleteSnapshotRequest .getJobId (), equalTo ("snapshots-2" ));
126151 assertThat (deleteSnapshotRequest .getSnapshotId (), equalTo ("snapshots-2_1" ));
127-
128- verify (listener ).onResponse (true );
129152 }
130153
131154 public void testRemove_GivenClientSearchRequestsFail () throws IOException {
@@ -144,13 +167,14 @@ public void testRemove_GivenClientSearchRequestsFail() throws IOException {
144167
145168 createExpiredModelSnapshotsRemover ().remove (listener );
146169
170+ listener .waitToCompletion ();
171+ assertThat (listener .success , is (false ));
172+
147173 assertThat (capturedSearchRequests .size (), equalTo (1 ));
148174 SearchRequest searchRequest = capturedSearchRequests .get (0 );
149175 assertThat (searchRequest .indices (), equalTo (new String [] {AnomalyDetectorsIndex .jobResultsAliasedName ("snapshots-1" )}));
150176
151177 assertThat (capturedDeleteModelSnapshotRequests .size (), equalTo (0 ));
152-
153- verify (listener ).onFailure (any ());
154178 }
155179
156180 public void testRemove_GivenClientDeleteSnapshotRequestsFail () throws IOException {
@@ -169,6 +193,9 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
169193
170194 createExpiredModelSnapshotsRemover ().remove (listener );
171195
196+ listener .waitToCompletion ();
197+ assertThat (listener .success , is (false ));
198+
172199 assertThat (capturedSearchRequests .size (), equalTo (1 ));
173200 SearchRequest searchRequest = capturedSearchRequests .get (0 );
174201 assertThat (searchRequest .indices (), equalTo (new String [] {AnomalyDetectorsIndex .jobResultsAliasedName ("snapshots-1" )}));
@@ -177,8 +204,6 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
177204 DeleteModelSnapshotAction .Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests .get (0 );
178205 assertThat (deleteSnapshotRequest .getJobId (), equalTo ("snapshots-1" ));
179206 assertThat (deleteSnapshotRequest .getSnapshotId (), equalTo ("snapshots-1_1" ));
180-
181- verify (listener ).onFailure (any ());
182207 }
183208
184209 private void givenJobs (List <Job > jobs ) {
@@ -192,7 +217,7 @@ private void givenJobs(List<Job> jobs) {
192217 }
193218
194219 private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover () {
195- return new ExpiredModelSnapshotsRemover (client , clusterService );
220+ return new ExpiredModelSnapshotsRemover (client , threadPool , clusterService );
196221 }
197222
198223 private static ModelSnapshot createModelSnapshot (String jobId , String snapshotId ) {
@@ -230,7 +255,7 @@ private void givenClientRequests(boolean shouldSearchRequestsSucceed, boolean sh
230255 int callCount = 0 ;
231256
232257 @ Override
233- public Void answer (InvocationOnMock invocationOnMock ) throws Throwable {
258+ public Void answer (InvocationOnMock invocationOnMock ) {
234259 SearchRequest searchRequest = (SearchRequest ) invocationOnMock .getArguments ()[1 ];
235260 capturedSearchRequests .add (searchRequest );
236261 ActionListener <SearchResponse > listener = (ActionListener <SearchResponse >) invocationOnMock .getArguments ()[2 ];
@@ -244,7 +269,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
244269 }).when (client ).execute (same (SearchAction .INSTANCE ), any (), any ());
245270 doAnswer (new Answer <Void >() {
246271 @ Override
247- public Void answer (InvocationOnMock invocationOnMock ) throws Throwable {
272+ public Void answer (InvocationOnMock invocationOnMock ) {
248273 capturedDeleteModelSnapshotRequests .add ((DeleteModelSnapshotAction .Request ) invocationOnMock .getArguments ()[1 ]);
249274 ActionListener <DeleteModelSnapshotAction .Response > listener =
250275 (ActionListener <DeleteModelSnapshotAction .Response >) invocationOnMock .getArguments ()[2 ];
@@ -257,4 +282,30 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
257282 }
258283 }).when (client ).execute (same (DeleteModelSnapshotAction .INSTANCE ), any (), any ());
259284 }
285+
286+ private class TestListener implements ActionListener <Boolean > {
287+
288+ private boolean success ;
289+ private final CountDownLatch latch = new CountDownLatch (1 );
290+
291+ @ Override
292+ public void onResponse (Boolean aBoolean ) {
293+ success = aBoolean ;
294+ latch .countDown ();
295+ }
296+
297+ @ Override
298+ public void onFailure (Exception e ) {
299+ latch .countDown ();
300+ }
301+
302+ public void waitToCompletion () {
303+ try {
304+ latch .await (10 , TimeUnit .SECONDS );
305+ } catch (InterruptedException e ) {
306+ fail ("listener timed out before completing" );
307+ }
308+ }
309+ }
310+
260311}
0 commit comments