55 */
66package org .elasticsearch .upgrades ;
77
8+ import org .elasticsearch .Version ;
89import org .elasticsearch .client .Request ;
910import org .elasticsearch .client .ResponseException ;
1011import org .elasticsearch .client .RestClient ;
1112import org .elasticsearch .common .Strings ;
1213import org .elasticsearch .common .settings .Settings ;
14+ import org .elasticsearch .common .xcontent .ObjectPath ;
1315import org .elasticsearch .common .xcontent .support .XContentMapValues ;
1416
1517import java .io .IOException ;
@@ -88,6 +90,123 @@ public void testIndexFollowing() throws Exception {
8890 }
8991 }
9092
93+ public void testAutoFollowing () throws Exception {
94+ String leaderIndex1 = "logs-20200101" ;
95+ String leaderIndex2 = "logs-20200102" ;
96+ String leaderIndex3 = "logs-20200103" ;
97+
98+ if (clusterName == ClusterName .LEADER ) {
99+ switch (upgradeState ) {
100+ case NONE :
101+ case ONE_THIRD :
102+ case TWO_THIRD :
103+ break ;
104+ case ALL :
105+ index (leaderClient (), leaderIndex1 , 64 );
106+ assertBusy (() -> {
107+ String followerIndex = "copy-" + leaderIndex1 ;
108+ assertTotalHitCount (followerIndex , 320 , followerClient ());
109+ });
110+ index (leaderClient (), leaderIndex2 , 64 );
111+ assertBusy (() -> {
112+ String followerIndex = "copy-" + leaderIndex2 ;
113+ assertTotalHitCount (followerIndex , 256 , followerClient ());
114+ });
115+ index (leaderClient (), leaderIndex3 , 64 );
116+ assertBusy (() -> {
117+ String followerIndex = "copy-" + leaderIndex3 ;
118+ assertTotalHitCount (followerIndex , 192 , followerClient ());
119+ });
120+
121+ deleteAutoFollowPattern (followerClient (), "test_pattern" );
122+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex1 );
123+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex2 );
124+ stopIndexFollowing (followerClient (), "copy-" + leaderIndex3 );
125+ break ;
126+ default :
127+ throw new AssertionError ("unexpected upgrade_state [" + upgradeState + "]" );
128+ }
129+ } else if (clusterName == ClusterName .FOLLOWER ) {
130+ switch (upgradeState ) {
131+ case NONE :
132+ putAutoFollowPattern (followerClient (), "test_pattern" , "leader" , "logs-*" );
133+ createLeaderIndex (leaderClient (), leaderIndex1 );
134+ index (leaderClient (), leaderIndex1 , 64 );
135+ assertBusy (() -> {
136+ String followerIndex = "copy-" + leaderIndex1 ;
137+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (1 ));
138+ assertTotalHitCount (followerIndex , 64 , followerClient ());
139+ });
140+ break ;
141+ case ONE_THIRD :
142+ index (leaderClient (), leaderIndex1 , 64 );
143+ assertBusy (() -> {
144+ String followerIndex = "copy-" + leaderIndex1 ;
145+ assertTotalHitCount (followerIndex , 128 , followerClient ());
146+ });
147+ // Auto follow stats are kept in-memory on master elected node
148+ // and if this node get updated then auto follow stats are reset
149+ {
150+ int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices ();
151+ createLeaderIndex (leaderClient (), leaderIndex2 );
152+ index (leaderClient (), leaderIndex2 , 64 );
153+ assertBusy (() -> {
154+ String followerIndex = "copy-" + leaderIndex2 ;
155+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (previousNumberOfSuccessfulFollowedIndices + 1 ));
156+ assertTotalHitCount (followerIndex , 64 , followerClient ());
157+ });
158+ }
159+ break ;
160+ case TWO_THIRD :
161+ index (leaderClient (), leaderIndex1 , 64 );
162+ assertBusy (() -> {
163+ String followerIndex = "copy-" + leaderIndex1 ;
164+ assertTotalHitCount (followerIndex , 192 , followerClient ());
165+ });
166+ index (leaderClient (), leaderIndex2 , 64 );
167+ assertBusy (() -> {
168+ String followerIndex = "copy-" + leaderIndex2 ;
169+ assertTotalHitCount (followerIndex , 128 , followerClient ());
170+ });
171+
172+ // Auto follow stats are kept in-memory on master elected node
173+ // and if this node get updated then auto follow stats are reset
174+ {
175+ int previousNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices ();
176+ createLeaderIndex (leaderClient (), leaderIndex3 );
177+ index (leaderClient (), leaderIndex3 , 64 );
178+ assertBusy (() -> {
179+ String followerIndex = "copy-" + leaderIndex3 ;
180+ assertThat (getNumberOfSuccessfulFollowedIndices (), equalTo (previousNumberOfSuccessfulFollowedIndices + 1 ));
181+ assertTotalHitCount (followerIndex , 64 , followerClient ());
182+ });
183+ }
184+ break ;
185+ case ALL :
186+ index (leaderClient (), leaderIndex1 , 64 );
187+ assertBusy (() -> {
188+ String followerIndex = "copy-" + leaderIndex1 ;
189+ assertTotalHitCount (followerIndex , 256 , followerClient ());
190+ });
191+ index (leaderClient (), leaderIndex2 , 64 );
192+ assertBusy (() -> {
193+ String followerIndex = "copy-" + leaderIndex2 ;
194+ assertTotalHitCount (followerIndex , 192 , followerClient ());
195+ });
196+ index (leaderClient (), leaderIndex3 , 64 );
197+ assertBusy (() -> {
198+ String followerIndex = "copy-" + leaderIndex3 ;
199+ assertTotalHitCount (followerIndex , 128 , followerClient ());
200+ });
201+ break ;
202+ default :
203+ throw new UnsupportedOperationException ("unexpected upgrade state [" + upgradeState + "]" );
204+ }
205+ } else {
206+ throw new AssertionError ("unexpected cluster_name [" + clusterName + "]" );
207+ }
208+ }
209+
91210 public void testCannotFollowLeaderInUpgradedCluster () throws Exception {
92211 assumeTrue ("Tests only runs with upgrade_state [all]" , upgradeState == UpgradeState .ALL );
93212
@@ -113,12 +232,13 @@ public void testCannotFollowLeaderInUpgradedCluster() throws Exception {
113232 }
114233
115234 private static void createLeaderIndex (RestClient client , String indexName ) throws IOException {
116- Settings indexSettings = Settings .builder ()
117- .put ("index.soft_deletes.enabled" , true )
235+ Settings .Builder indexSettings = Settings .builder ()
118236 .put ("index.number_of_shards" , 1 )
119- .put ("index.number_of_replicas" , 0 )
120- .build ();
121- createIndex (client , indexName , indexSettings );
237+ .put ("index.number_of_replicas" , 0 );
238+ if (UPGRADE_FROM_VERSION .before (Version .V_7_0_0 ) || randomBoolean ()) {
239+ indexSettings .put ("index.soft_deletes.enabled" , true );
240+ }
241+ createIndex (client , indexName , indexSettings .build ());
122242 }
123243
124244 private static void createIndex (RestClient client , String name , Settings settings ) throws IOException {
@@ -134,6 +254,29 @@ private static void followIndex(RestClient client, String leaderCluster, String
134254 assertOK (client .performRequest (request ));
135255 }
136256
257+ private static void putAutoFollowPattern (RestClient client , String name , String remoteCluster , String pattern ) throws IOException {
258+ Request request = new Request ("PUT" , "/_ccr/auto_follow/" + name );
259+ request .setJsonEntity ("{\" leader_index_patterns\" : [\" " + pattern + "\" ], \" remote_cluster\" : \" " + remoteCluster + "\" ," +
260+ "\" follow_index_pattern\" : \" copy-{{leader_index}}\" , \" read_poll_timeout\" : \" 10ms\" }" );
261+ assertOK (client .performRequest (request ));
262+ }
263+
264+ private static void deleteAutoFollowPattern (RestClient client , String patternName ) throws IOException {
265+ Request request = new Request ("DELETE" , "/_ccr/auto_follow/" + patternName );
266+ assertOK (client .performRequest (request ));
267+ }
268+
269+ private int getNumberOfSuccessfulFollowedIndices () throws IOException {
270+ Request statsRequest = new Request ("GET" , "/_ccr/stats" );
271+ Map <?, ?> response = toMap (client ().performRequest (statsRequest ));
272+ Integer actualSuccessfulFollowedIndices = ObjectPath .eval ("auto_follow_stats.number_of_successful_follow_indices" , response );
273+ if (actualSuccessfulFollowedIndices != null ) {
274+ return actualSuccessfulFollowedIndices ;
275+ } else {
276+ return -1 ;
277+ }
278+ }
279+
137280 private static void index (RestClient client , String index , int numDocs ) throws IOException {
138281 for (int i = 0 ; i < numDocs ; i ++) {
139282 final Request request = new Request ("POST" , "/" + index + "/_doc/" );
@@ -162,4 +305,10 @@ private static void verifyTotalHitCount(final String index,
162305 assertThat (totalHits , equalTo (expectedTotalHits ));
163306 }
164307
308+ private static void stopIndexFollowing (RestClient client , String followerIndex ) throws IOException {
309+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_ccr/pause_follow" )));
310+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_close" )));
311+ assertOK (client .performRequest (new Request ("POST" , "/" + followerIndex + "/_ccr/unfollow" )));
312+ }
313+
165314}
0 commit comments