@@ -84,14 +84,46 @@ public void testFollowIndex() throws Exception {
8484 ensureYellow (indexName1 );
8585 followIndex ("leader_cluster:" + indexName1 , indexName1 );
8686 assertBusy (() -> verifyDocuments (client (), indexName1 , numDocs ));
87+ assertThat (countCcrNodeTasks (), equalTo (5 ));
88+ assertOK (client ().performRequest ("POST" , "/_xpack/ccr/" + indexName1 + "/_unfollow" ));
89+ // Make sure that there are no other ccr relates operations running:
90+ assertBusy (() -> {
91+ Map <String , Object > clusterState = toMap (adminClient ().performRequest ("GET" , "/_cluster/state" ));
92+ List <?> tasks = (List <?>) XContentMapValues .extractValue ("metadata.persistent_tasks.tasks" , clusterState );
93+ assertThat (tasks .size (), equalTo (0 ));
94+ assertThat (countCcrNodeTasks (), equalTo (0 ));
95+ });
8796
8897 // TODO: remove mapping here when ccr syncs mappings too
8998 createIndex (indexName2 , indexSettings , "\" doc\" : { \" properties\" : { \" field\" : { \" type\" : \" long\" }}}" );
9099 ensureYellow (indexName2 );
91100 followIndex ("leader_cluster:" + indexName2 , indexName2 );
101+ // Verify that nothing has been replicated and no node tasks are running
102+ // These node tasks should have been failed due to the fact that the user
103+ // has no sufficient priviledges.
104+ assertBusy (() -> assertThat (countCcrNodeTasks (), equalTo (0 )));
105+ verifyDocuments (adminClient (), indexName2 , 0 );
92106 }
93107 }
94108
109+ private int countCcrNodeTasks () throws IOException {
110+ Map <String , Object > rsp1 = toMap (adminClient ().performRequest ("GET" , "/_tasks" ,
111+ Collections .singletonMap ("detailed" , "true" )));
112+ Map <?, ?> nodes = (Map <?, ?>) rsp1 .get ("nodes" );
113+ assertThat (nodes .size (), equalTo (1 ));
114+ Map <?, ?> node = (Map <?, ?>) nodes .values ().iterator ().next ();
115+ Map <?, ?> nodeTasks = (Map <?, ?>) node .get ("tasks" );
116+ int numNodeTasks = 0 ;
117+ for (Map .Entry <?, ?> entry : nodeTasks .entrySet ()) {
118+ Map <?, ?> nodeTask = (Map <?, ?>) entry .getValue ();
119+ String action = (String ) nodeTask .get ("action" );
120+ if (action .startsWith ("shard_follow" )) {
121+ numNodeTasks ++;
122+ }
123+ }
124+ return numNodeTasks ;
125+ }
126+
95127 private static void index (String index , String id , Object ... fields ) throws IOException {
96128 XContentBuilder document = jsonBuilder ().startObject ();
97129 for (int i = 0 ; i < fields .length ; i += 2 ) {
@@ -116,9 +148,7 @@ void verifyDocuments(RestClient client, String index, int expectedNumDocs) throw
116148 params .put ("size" , Integer .toString (expectedNumDocs ));
117149 params .put ("sort" , "field:asc" );
118150 params .put ("pretty" , "true" );
119- String strResponse = EntityUtils .toString (client .performRequest ("GET" , "/" + index + "/_search" , params ).getEntity ());
120- logger .info ("RESPONSE: {}" , strResponse );
121- Map <String , ?> response = toMap (strResponse );
151+ Map <String , ?> response = toMap (client .performRequest ("GET" , "/" + index + "/_search" , params ));
122152
123153 int numDocs = (int ) XContentMapValues .extractValue ("hits.total" , response );
124154 assertThat (numDocs , equalTo (expectedNumDocs ));
0 commit comments