@@ -66,32 +66,28 @@ public void testFollowIndex() throws Exception {
6666 refresh (allowedIndex );
6767 verifyDocuments (allowedIndex , numDocs , "*:*" );
6868 } else {
69- followIndex (client (), "leader_cluster" , allowedIndex , allowedIndex );
69+ followIndex ("leader_cluster" , allowedIndex , allowedIndex );
7070 assertBusy (() -> verifyDocuments (allowedIndex , numDocs , "*:*" ));
7171 assertThat (getCcrNodeTasks (), contains (new CcrNodeTask ("leader_cluster" , allowedIndex , allowedIndex , 0 )));
7272 assertBusy (() -> verifyCcrMonitoring (allowedIndex , allowedIndex ), 30 , TimeUnit .SECONDS );
73- assertOK ( client (). performRequest ( new Request ( "POST" , "/" + allowedIndex + "/_ccr/pause_follow" )) );
73+ pauseFollow ( allowedIndex );
7474 // Make sure that there are no other ccr relates operations running:
7575 assertBusy (() -> {
76- Map <String , Object > clusterState = toMap (adminClient ().performRequest (new Request ("GET" , "/_cluster/state" )));
77- List <?> tasks = (List <?>) XContentMapValues .extractValue ("metadata.persistent_tasks.tasks" , clusterState );
78- assertThat (tasks .size (), equalTo (0 ));
76+ assertNoPersistentTasks ();
7977 assertThat (getCcrNodeTasks (), empty ());
8078 });
8179
8280 resumeFollow (allowedIndex );
8381 assertThat (getCcrNodeTasks (), contains (new CcrNodeTask ("leader_cluster" , allowedIndex , allowedIndex , 0 )));
84- assertOK ( client (). performRequest ( new Request ( "POST" , "/" + allowedIndex + "/_ccr/pause_follow" )) );
82+ pauseFollow ( allowedIndex );
8583 // Make sure that there are no other ccr relates operations running:
8684 assertBusy (() -> {
87- Map <String , Object > clusterState = toMap (adminClient ().performRequest (new Request ("GET" , "/_cluster/state" )));
88- List <?> tasks = (List <?>) XContentMapValues .extractValue ("metadata.persistent_tasks.tasks" , clusterState );
89- assertThat (tasks .size (), equalTo (0 ));
85+ assertNoPersistentTasks ();
9086 assertThat (getCcrNodeTasks (), empty ());
9187 });
9288
93- assertOK ( client (). performRequest ( new Request ( "POST" , "/" + allowedIndex + "/_close" )) );
94- assertOK ( client (). performRequest ( new Request ( "POST" , "/" + allowedIndex + "/_ccr/unfollow" )) );
89+ closeIndex ( allowedIndex );
90+ unfollow ( allowedIndex );
9591 Exception e = expectThrows (ResponseException .class , () -> resumeFollow (allowedIndex ));
9692 assertThat (e .getMessage (), containsString ("follow index [" + allowedIndex + "] does not have ccr metadata" ));
9793
@@ -143,7 +139,8 @@ public void testFollowIndex() throws Exception {
143139 }
144140
145141 public void testAutoFollowPatterns () throws Exception {
146- assumeFalse ("Test should only run when both clusters are running" , "leader" .equals (targetCluster ));
142+ assumeTrue ("Test should only run with target_cluster=follow" , "follow" .equals (targetCluster ));
143+
147144 String allowedIndex = "logs-eu_20190101" ;
148145 String disallowedIndex = "logs-us_20190101" ;
149146
@@ -175,20 +172,21 @@ public void testAutoFollowPatterns() throws Exception {
175172 }
176173 }
177174
178- assertBusy (() -> {
179- ensureYellow (allowedIndex );
180- verifyDocuments (allowedIndex , 5 , "*:*" );
181- }, 30 , TimeUnit .SECONDS );
182- assertThat (indexExists (disallowedIndex ), is (false ));
183- assertBusy (() -> {
184- verifyCcrMonitoring (allowedIndex , allowedIndex );
185- verifyAutoFollowMonitoring ();
186- }, 30 , TimeUnit .SECONDS );
187-
188- // Cleanup by deleting auto follow pattern and pause following:
189- request = new Request ("DELETE" , "/_ccr/auto_follow/test_pattern" );
190- assertOK (client ().performRequest (request ));
191- pauseFollow (client (), allowedIndex );
175+ try {
176+ assertBusy (() -> ensureYellow (allowedIndex ), 30 , TimeUnit .SECONDS );
177+ assertBusy (() -> verifyDocuments (allowedIndex , 5 , "*:*" ), 30 , TimeUnit .SECONDS );
178+ assertThat (indexExists (disallowedIndex ), is (false ));
179+ assertBusy (() -> verifyCcrMonitoring (allowedIndex , allowedIndex ), 30 , TimeUnit .SECONDS );
180+ assertBusy (ESCCRRestTestCase ::verifyAutoFollowMonitoring , 30 , TimeUnit .SECONDS );
181+ } finally {
182+ // Cleanup by deleting auto follow pattern and pause following:
183+ try {
184+ deleteAutoFollowPattern ("test_pattern" );
185+ pauseFollow (allowedIndex );
186+ } catch (Throwable e ) {
187+ logger .warn ("Failed to cleanup after the test" , e );
188+ }
189+ }
192190 }
193191
194192 public void testForgetFollower () throws IOException {
@@ -205,7 +203,7 @@ public void testForgetFollower() throws IOException {
205203 final Response response = client ().performRequest (new Request ("GET" , "/" + forgetFollower + "/_stats" ));
206204 final String followerIndexUUID = ObjectPath .createFromResponse (response ).evaluate ("indices." + forgetFollower + ".uuid" );
207205
208- assertOK ( client (). performRequest ( new Request ( "POST" , "/" + forgetFollower + "/_ccr/pause_follow" )) );
206+ pauseFollow ( forgetFollower );
209207
210208 try (RestClient leaderClient = buildLeaderClient (restAdminSettings ())) {
211209 final Request request = new Request ("POST" , "/" + forgetLeader + "/_ccr/forget_follower" );
@@ -255,24 +253,17 @@ public void testCleanShardFollowTaskAfterDeleteFollower() throws Exception {
255253 } else {
256254 logger .info ("running against follower cluster" );
257255 followIndex (client (), "leader_cluster" , cleanLeader , cleanFollower );
258-
259- final Request request = new Request ("DELETE" , "/" + cleanFollower );
260- final Response response = client ().performRequest (request );
261- assertOK (response );
256+ deleteIndex (client (), cleanFollower );
262257 // the shard follow task should have been cleaned up on behalf of the user, see ShardFollowTaskCleaner
263258 assertBusy (() -> {
264- Map <String , Object > clusterState = toMap (adminClient ().performRequest (new Request ("GET" , "/_cluster/state" )));
265- List <?> tasks = (List <?>) XContentMapValues .extractValue ("metadata.persistent_tasks.tasks" , clusterState );
266- assertThat (tasks .size (), equalTo (0 ));
259+ assertNoPersistentTasks ();
267260 assertThat (getCcrNodeTasks (), empty ());
268261 });
269262 }
270263 }
271264
272265 public void testUnPromoteAndFollowDataStream () throws Exception {
273- if ("follow" .equals (targetCluster ) == false ) {
274- return ;
275- }
266+ assumeTrue ("Test should only run with target_cluster=follow" , "follow" .equals (targetCluster ));
276267
277268 var numDocs = 64 ;
278269 var dataStreamName = "logs-eu-monitor1" ;
@@ -282,7 +273,7 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
282273 {
283274 createAutoFollowPattern (adminClient (), "test_pattern" , "logs-eu*" , "leader_cluster" );
284275 }
285- // Create data stream and ensure that is is auto followed
276+ // Create data stream and ensure that it is auto followed
286277 {
287278 try (var leaderClient = buildLeaderClient ()) {
288279 for (var i = 0 ; i < numDocs ; i ++) {
@@ -304,11 +295,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
304295 }
305296 // promote and unfollow
306297 {
307- var promoteRequest = new Request ("POST" , "/_data_stream/_promote/" + dataStreamName );
308- assertOK (client ().performRequest (promoteRequest ));
298+ assertOK (client ().performRequest (new Request ("POST" , "/_data_stream/_promote/" + dataStreamName )));
309299 // Now that the data stream is a non replicated data stream, rollover.
310- var rolloverRequest = new Request ("POST" , "/" + dataStreamName + "/_rollover" );
311- assertOK (client ().performRequest (rolloverRequest ));
300+ assertOK (client ().performRequest (new Request ("POST" , "/" + dataStreamName + "/_rollover" )));
312301 // Unfollow .ds-logs-eu-monitor1-000001,
313302 // which is now possible because this index can now be closed as it is no longer the write index.
314303 pauseFollow (backingIndexName (dataStreamName , 1 ));
@@ -317,4 +306,9 @@ public void testUnPromoteAndFollowDataStream() throws Exception {
317306 }
318307 }
319308
309+ private static void assertNoPersistentTasks () throws IOException {
310+ Map <String , Object > clusterState = toMap (adminClient ().performRequest (new Request ("GET" , "/_cluster/state" )));
311+ List <?> tasks = (List <?>) XContentMapValues .extractValue ("metadata.persistent_tasks.tasks" , clusterState );
312+ assertThat (tasks , empty ());
313+ }
320314}
0 commit comments