@@ -252,13 +252,19 @@ void autoFollowIndices() {
252252 final int slot = i ;
253253 final String clusterAlias = entry .getKey ();
254254 final AutoFollowPattern autoFollowPattern = entry .getValue ();
255- final List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (clusterAlias );
256255
257256 getLeaderClusterState (autoFollowPattern .getHeaders (), clusterAlias , (leaderClusterState , e ) -> {
258257 if (leaderClusterState != null ) {
259258 assert e == null ;
260- Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
261- handleClusterAlias (clusterAlias , autoFollowPattern , followedIndices , leaderClusterState , resultHandler );
259+ final List <String > followedIndices = autoFollowMetadata .getFollowedLeaderIndexUUIDs ().get (clusterAlias );
260+ final List <Index > leaderIndicesToFollow =
261+ getLeaderIndicesToFollow (autoFollowPattern , leaderClusterState , followerClusterState , followedIndices );
262+ if (leaderIndicesToFollow .isEmpty ()) {
263+ finalise (slot , new AutoFollowResult (clusterAlias ));
264+ }else {
265+ Consumer <AutoFollowResult > resultHandler = result -> finalise (slot , result );
266+ checkAutoFollowPattern (clusterAlias , autoFollowPattern , leaderIndicesToFollow , resultHandler );
267+ }
262268 } else {
263269 finalise (slot , new AutoFollowResult (clusterAlias , e ));
264270 }
@@ -267,71 +273,50 @@ void autoFollowIndices() {
267273 }
268274 }
269275
270- private void handleClusterAlias (
271- String clusterAlias ,
272- AutoFollowPattern autoFollowPattern ,
273- List <String > followedIndexUUIDs ,
274- ClusterState leaderClusterState ,
275- Consumer <AutoFollowResult > resultHandler
276- ) {
277- final List <Index > leaderIndicesToFollow =
278- getLeaderIndicesToFollow (autoFollowPattern , leaderClusterState , followerClusterState , followedIndexUUIDs );
279- if (leaderIndicesToFollow .isEmpty ()) {
280- resultHandler .accept (new AutoFollowResult (clusterAlias ));
281- } else {
282- final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
283- final AtomicArray <Tuple <Index , Exception >> results = new AtomicArray <>(leaderIndicesToFollow .size ());
284- for (int i = 0 ; i < leaderIndicesToFollow .size (); i ++) {
285- final int slot = i ;
286- final Index indexToFollow = leaderIndicesToFollow .get (i );
287- final String leaderIndexName = indexToFollow .getName ();
288- final String followIndexName = getFollowerIndexName (autoFollowPattern , leaderIndexName );
289-
290- String leaderIndexNameWithClusterAliasPrefix = clusterAlias .equals ("_local_" ) ? leaderIndexName :
291- clusterAlias + ":" + leaderIndexName ;
292- FollowIndexAction .Request followRequest =
293- new FollowIndexAction .Request (leaderIndexNameWithClusterAliasPrefix , followIndexName ,
294- autoFollowPattern .getMaxBatchOperationCount (), autoFollowPattern .getMaxConcurrentReadBatches (),
295- autoFollowPattern .getMaxOperationSizeInBytes (), autoFollowPattern .getMaxConcurrentWriteBatches (),
296- autoFollowPattern .getMaxWriteBufferSize (), autoFollowPattern .getMaxRetryDelay (),
297- autoFollowPattern .getIdleShardRetryDelay ());
298-
299- // Execute if the create and follow api call succeeds:
300- Runnable successHandler = () -> {
301- LOGGER .info ("Auto followed leader index [{}] as follow index [{}]" , leaderIndexName , followIndexName );
302-
303- // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
304- // (so that we do not try to follow it in subsequent auto follow runs)
305- Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (clusterAlias , indexToFollow );
306- // The coordinator always runs on the elected master node, so we can update cluster state here:
307- updateAutoFollowMetadata (function , updateError -> {
308- assert results .get (slot ) == null ;
309- if (updateError != null ) {
310- LOGGER .error ("Failed to mark leader index [" + leaderIndexName + "] as auto followed" , updateError );
311- results .set (slot , new Tuple <>(indexToFollow , updateError ));
312- } else {
313- results .set (slot , new Tuple <>(indexToFollow , null ));
314- LOGGER .debug ("Successfully marked leader index [{}] as auto followed" , leaderIndexName );
315- }
316- if (leaderIndicesCountDown .countDown ()) {
317- resultHandler .accept (new AutoFollowResult (clusterAlias , results .asList ()));
318- }
319- });
320- };
321- // Execute if the create and follow apu call fails:
322- Consumer <Exception > failureHandler = followError -> {
323- assert followError != null ;
324- LOGGER .warn ("Failed to auto follow leader index [" + leaderIndexName + "]" , followError );
325- results .set (slot , new Tuple <>(indexToFollow , followError ));
326- if (leaderIndicesCountDown .countDown ()) {
327- resultHandler .accept (new AutoFollowResult (clusterAlias , results .asList ()));
328- }
329- };
330- createAndFollow (autoFollowPattern .getHeaders (), followRequest , successHandler , failureHandler );
331- }
276+ private void checkAutoFollowPattern (String clusterAlias , AutoFollowPattern autoFollowPattern ,
277+ List <Index > leaderIndicesToFollow , Consumer <AutoFollowResult > resultHandler ) {
278+
279+ final CountDown leaderIndicesCountDown = new CountDown (leaderIndicesToFollow .size ());
280+ final AtomicArray <Tuple <Index , Exception >> results = new AtomicArray <>(leaderIndicesToFollow .size ());
281+ for (int i = 0 ; i < leaderIndicesToFollow .size (); i ++) {
282+ final Index indexToFollow = leaderIndicesToFollow .get (i );
283+ final int slot = i ;
284+ followLeaderIndex (clusterAlias , indexToFollow , autoFollowPattern , error -> {
285+ results .set (slot , new Tuple <>(indexToFollow , error ));
286+ if (leaderIndicesCountDown .countDown ()) {
287+ resultHandler .accept (new AutoFollowResult (clusterAlias , results .asList ()));
288+ }
289+ });
332290 }
333291 }
334292
293+ private void followLeaderIndex (String clusterAlias , Index indexToFollow ,
294+ AutoFollowPattern pattern , Consumer <Exception > onResult ) {
295+ final String leaderIndexName = indexToFollow .getName ();
296+ final String followIndexName = getFollowerIndexName (pattern , leaderIndexName );
297+
298+ String leaderIndexNameWithClusterAliasPrefix = clusterAlias .equals ("_local_" ) ? leaderIndexName :
299+ clusterAlias + ":" + leaderIndexName ;
300+ FollowIndexAction .Request request =
301+ new FollowIndexAction .Request (leaderIndexNameWithClusterAliasPrefix , followIndexName ,
302+ pattern .getMaxBatchOperationCount (), pattern .getMaxConcurrentReadBatches (),
303+ pattern .getMaxOperationSizeInBytes (), pattern .getMaxConcurrentWriteBatches (),
304+ pattern .getMaxWriteBufferSize (), pattern .getMaxRetryDelay (),
305+ pattern .getIdleShardRetryDelay ());
306+
307+ // Execute if the create and follow api call succeeds:
308+ Runnable successHandler = () -> {
309+ LOGGER .info ("Auto followed leader index [{}] as follow index [{}]" , leaderIndexName , followIndexName );
310+
311+ // This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
312+ // (so that we do not try to follow it in subsequent auto follow runs)
313+ Function <ClusterState , ClusterState > function = recordLeaderIndexAsFollowFunction (clusterAlias , indexToFollow );
314+ // The coordinator always runs on the elected master node, so we can update cluster state here:
315+ updateAutoFollowMetadata (function , onResult );
316+ };
317+ createAndFollow (pattern .getHeaders (), request , successHandler , onResult );
318+ }
319+
335320 private void finalise (int slot , AutoFollowResult result ) {
336321 assert autoFollowResults .get (slot ) == null ;
337322 autoFollowResults .set (slot , result );
0 commit comments