1212import org .elasticsearch .client .RestClient ;
1313import org .elasticsearch .common .Strings ;
1414import org .elasticsearch .common .settings .Settings ;
15+ import org .elasticsearch .common .xcontent .ObjectPath ;
1516import org .elasticsearch .common .xcontent .support .XContentMapValues ;
1617import org .elasticsearch .rest .RestStatus ;
1718
@@ -90,6 +91,123 @@ public void testIndexFollowing() throws Exception {
9091 }
9192 }
9293
94+ public void testAutoFollowing () throws Exception {
95+ String leaderIndex1 = "logs-20200101" ;
96+ String leaderIndex2 = "logs-20200102" ;
97+ String leaderIndex3 = "logs-20200103" ;
98+
99+ if (clusterName == ClusterName .LEADER ) {
100+ switch (upgradeState ) {
101+ case NONE :
102+ case ONE_THIRD :
103+ case TWO_THIRD :
104+ break ;
105+ case ALL :
106+ index (leaderClient (), leaderIndex1 , 64 );
107+ assertBusy (() -> {
108+ String followerIndex = "copy-" + leaderIndex1 ;
109+ assertTotalHitCount (followerIndex , 320 , followerClient ());
110+ });
111+ index (leaderClient (), leaderIndex2 , 64 );
112+ assertBusy (() -> {
113+ String followerIndex = "copy-" + leaderIndex2 ;
114+ assertTotalHitCount (followerIndex , 256 , followerClient ());
115+ });
116+ index (leaderClient (), leaderIndex3 , 64 );
117+ assertBusy (() -> {
118+ String followerIndex = "copy-" + leaderIndex3 ;
119+ assertTotalHitCount (followerIndex , 192 , followerClient ());
120+ });
121+
122+ deleteAutoFollowPattern (followerClient (), "test_pattern" );
123+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex1 );
124+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex2 );
125+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex3 );
126+ break ;
127+ default :
128+ throw new AssertionError ("unexpected upgrade_state [" + upgradeState + "]" );
129+ }
130+ } else if (clusterName == ClusterName .FOLLOWER ) {
131+ switch (upgradeState ) {
132+ case NONE :
133+ putAutoFollowPattern (followerClient (), "test_pattern" , "leader" , "logs-*" );
134+ createLeaderIndex (leaderClient (), leaderIndex1 );
135+ index (leaderClient (), leaderIndex1 , 64 );
136+ assertBusy (() -> {
137+ String followerIndex = "copy-" + leaderIndex1 ;
138+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (1 ));
139+ assertTotalHitCount (followerIndex , 64 , followerClient ());
140+ });
141+ break ;
142+ case ONE_THIRD :
143+ index (leaderClient (), leaderIndex1 , 64 );
144+ assertBusy (() -> {
145+ String followerIndex = "copy-" + leaderIndex1 ;
146+ assertTotalHitCount (followerIndex , 128 , followerClient ());
147+ });
148+ // Auto follow stats are kept in-memory on master elected node
149+ // and if this node get updated then auto follow stats are reset
150+ {
151+ int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices ();
152+ createLeaderIndex (leaderClient (), leaderIndex2 );
153+ index (leaderClient (), leaderIndex2 , 64 );
154+ assertBusy (() -> {
155+ String followerIndex = "copy-" + leaderIndex2 ;
156+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (previousNumberOfSuccessfulFollowedIndices + 1 ));
157+ assertTotalHitCount (followerIndex , 64 , followerClient ());
158+ });
159+ }
160+ break ;
161+ case TWO_THIRD :
162+ index (leaderClient (), leaderIndex1 , 64 );
163+ assertBusy (() -> {
164+ String followerIndex = "copy-" + leaderIndex1 ;
165+ assertTotalHitCount (followerIndex , 192 , followerClient ());
166+ });
167+ index (leaderClient (), leaderIndex2 , 64 );
168+ assertBusy (() -> {
169+ String followerIndex = "copy-" + leaderIndex2 ;
170+ assertTotalHitCount (followerIndex , 128 , followerClient ());
171+ });
172+
173+ // Auto follow stats are kept in-memory on master elected node
174+ // and if this node get updated then auto follow stats are reset
175+ {
176+ int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices ();
177+ createLeaderIndex (leaderClient (), leaderIndex3 );
178+ index (leaderClient (), leaderIndex3 , 64 );
179+ assertBusy (() -> {
180+ String followerIndex = "copy-" + leaderIndex3 ;
181+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (previousNumberOfSuccessfulFollowedIndices + 1 ));
182+ assertTotalHitCount (followerIndex , 64 , followerClient ());
183+ });
184+ }
185+ break ;
186+ case ALL :
187+ index (leaderClient (), leaderIndex1 , 64 );
188+ assertBusy (() -> {
189+ String followerIndex = "copy-" + leaderIndex1 ;
190+ assertTotalHitCount (followerIndex , 256 , followerClient ());
191+ });
192+ index (leaderClient (), leaderIndex2 , 64 );
193+ assertBusy (() -> {
194+ String followerIndex = "copy-" + leaderIndex2 ;
195+ assertTotalHitCount (followerIndex , 192 , followerClient ());
196+ });
197+ index (leaderClient (), leaderIndex3 , 64 );
198+ assertBusy (() -> {
199+ String followerIndex = "copy-" + leaderIndex3 ;
200+ assertTotalHitCount (followerIndex , 128 , followerClient ());
201+ });
202+ break ;
203+ default :
204+ throw new UnsupportedOperationException ("unexpected upgrade state [" + upgradeState + "]" );
205+ }
206+ } else {
207+ throw new AssertionError ("unexpected cluster_name [" + clusterName + "]" );
208+ }
209+ }
210+
93211 public void testCannotFollowLeaderInUpgradedCluster () throws Exception {
94212 assumeTrue ("Tests only runs with upgrade_state [all]" , upgradeState == UpgradeState .ALL );
95213 assumeTrue ("Put follow api does not restore from ccr repository before 6.7.0" ,
@@ -117,12 +235,11 @@ public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
117235 }
118236
119237 private static void createLeaderIndex (RestClient client , String indexName ) throws IOException {
120- Settings indexSettings = Settings .builder ()
238+ Settings . Builder indexSettings = Settings .builder ()
121239 .put ("index.soft_deletes.enabled" , true )
122240 .put ("index.number_of_shards" , 1 )
123- .put ("index.number_of_replicas" , 0 )
124- .build ();
125- createIndex (client , indexName , indexSettings );
241+ .put ("index.number_of_replicas" , 0 );
242+ createIndex (client , indexName , indexSettings .build ());
126243 }
127244
128245 private static void createIndex (RestClient client , String name , Settings settings ) throws IOException {
@@ -138,6 +255,29 @@ private static void followIndex(RestClient client, String leaderCluster, String
138255 assertOK (client .performRequest (request ));
139256 }
140257
258+ private static void putAutoFollowPattern (RestClient client , String name , String remoteCluster , String pattern ) throws IOException {
259+ Request request = new Request ("PUT" , "/_ccr/auto_follow/" + name );
260+ request .setJsonEntity ("{\" leader_index_patterns\" : [\" " + pattern + "\" ], \" remote_cluster\" : \" " + remoteCluster + "\" ," +
261+ "\" follow_index_pattern\" : \" copy-{{leader_index}}\" , \" read_poll_timeout\" : \" 10ms\" }" );
262+ assertOK (client .performRequest (request ));
263+ }
264+
265+ private static void deleteAutoFollowPattern (RestClient client , String patternName ) throws IOException {
266+ Request request = new Request ("DELETE" , "/_ccr/auto_follow/" + patternName );
267+ assertOK (client .performRequest (request ));
268+ }
269+
270+ private int getNumberOfSuccessfulFollowedIndices () throws IOException {
271+ Request statsRequest = new Request ("GET" , "/_ccr/stats" );
272+ Map <?, ?> response = toMap (client ().performRequest (statsRequest ));
273+ Integer actualSuccessfulFollowedIndices = ObjectPath .eval ("auto_follow_stats.number_of_successful_follow_indices" , response );
274+ if (actualSuccessfulFollowedIndices != null ) {
275+ return actualSuccessfulFollowedIndices ;
276+ } else {
277+ return -1 ;
278+ }
279+ }
280+
141281 private static void index (RestClient client , String index , int numDocs ) throws IOException {
142282 for (int i = 0 ; i < numDocs ; i ++) {
143283 final Request request = new Request ("POST" , "/" + index + "/_doc/" );
@@ -171,4 +311,10 @@ private static void verifyTotalHitCount(final String index,
171311 assertThat (totalHits , equalTo (expectedTotalHits ));
172312 }
173313
314+ private static void stopIndexFollowing (RestClient client , String followerIndex ) throws IOException {
315+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_ccr/pause_follow" )));
316+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_close" )));
317+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_ccr/unfollow" )));
318+ }
319+
174320}
0 commit comments