55 */
66package org .elasticsearch .xpack .ccr .action ;
77
8+ import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
89import org .elasticsearch .Version ;
910import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
11+ import org .elasticsearch .action .support .replication .ClusterStateCreationUtils ;
1012import org .elasticsearch .client .Client ;
1113import org .elasticsearch .cluster .ClusterName ;
1214import org .elasticsearch .cluster .ClusterState ;
1820import org .elasticsearch .cluster .routing .ShardRoutingState ;
1921import org .elasticsearch .cluster .routing .TestShardRouting ;
2022import org .elasticsearch .cluster .service .ClusterService ;
23+ import org .elasticsearch .common .UUIDs ;
2124import org .elasticsearch .common .collect .Tuple ;
2225import org .elasticsearch .common .settings .ClusterSettings ;
2326import org .elasticsearch .common .settings .Settings ;
2427import org .elasticsearch .common .unit .ByteSizeValue ;
2528import org .elasticsearch .common .unit .TimeValue ;
29+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
2630import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
2731import org .elasticsearch .index .Index ;
2832import org .elasticsearch .index .IndexSettings ;
4448import java .util .LinkedList ;
4549import java .util .List ;
4650import java .util .Map ;
51+ import java .util .Set ;
4752import java .util .concurrent .CountDownLatch ;
4853import java .util .concurrent .ExecutorService ;
4954import java .util .concurrent .Executors ;
5055import java .util .concurrent .atomic .AtomicInteger ;
56+ import java .util .concurrent .atomic .AtomicReference ;
5157import java .util .function .BiConsumer ;
5258import java .util .function .Consumer ;
5359import java .util .function .Function ;
5763import static org .elasticsearch .xpack .ccr .action .AutoFollowCoordinator .AutoFollower .recordLeaderIndexAsFollowFunction ;
5864import static org .hamcrest .Matchers .equalTo ;
5965import static org .hamcrest .Matchers .greaterThan ;
66+ import static org .hamcrest .Matchers .hasItem ;
6067import static org .hamcrest .Matchers .is ;
6168import static org .hamcrest .Matchers .notNullValue ;
6269import static org .hamcrest .Matchers .nullValue ;
@@ -416,6 +423,26 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() {
416423 assertThat (result .get (1 ).getName (), equalTo ("index2" ));
417424 }
418425
426+ public void testGetLeaderIndicesToFollowWithClosedIndices () {
427+ final AutoFollowPattern autoFollowPattern = new AutoFollowPattern ("remote" , Collections .singletonList ("*" ),
428+ null , null , null , null , null , null , null , null , null , null , null );
429+
430+ // index is opened
431+ ClusterState remoteState = ClusterStateCreationUtils .stateWithActivePrimary ("test-index" , true , randomIntBetween (1 , 3 ), 0 );
432+ List <Index > result = AutoFollower .getLeaderIndicesToFollow (autoFollowPattern , remoteState , Collections .emptyList ());
433+ assertThat (result .size (), equalTo (1 ));
434+ assertThat (result , hasItem (remoteState .metaData ().index ("test-index" ).getIndex ()));
435+
436+ // index is closed
437+ remoteState = ClusterState .builder (remoteState )
438+ .metaData (MetaData .builder (remoteState .metaData ())
439+ .put (IndexMetaData .builder (remoteState .metaData ().index ("test-index" )).state (IndexMetaData .State .CLOSE ).build (), true )
440+ .build ())
441+ .build ();
442+ result = AutoFollower .getLeaderIndicesToFollow (autoFollowPattern , remoteState , Collections .emptyList ());
443+ assertThat (result .size (), equalTo (0 ));
444+ }
445+
419446 public void testRecordLeaderIndexAsFollowFunction () {
420447 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (Collections .emptyMap (),
421448 Collections .singletonMap ("pattern1" , Collections .emptyList ()), Collections .emptyMap ());
@@ -763,7 +790,9 @@ void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunctio
763790 autoFollower .start ();
764791 assertThat (allResults .size (), equalTo (states .length ));
765792 for (int i = 0 ; i < states .length ; i ++) {
766- assertThat (allResults .get (i ).autoFollowExecutionResults .containsKey (new Index ("logs-" + i , "_na_" )), is (true ));
793+ final String indexName = "logs-" + i ;
794+ assertThat (allResults .get (i ).autoFollowExecutionResults .keySet ().stream ()
795+ .anyMatch (index -> index .getName ().equals (indexName )), is (true ));
767796 }
768797 }
769798
@@ -1049,6 +1078,87 @@ void updateAutoFollowMetadata(
10491078 }
10501079 }
10511080
1081+ public void testClosedIndicesAreNotAutoFollowed () {
1082+ final Client client = mock (Client .class );
1083+ when (client .getRemoteClusterClient (anyString ())).thenReturn (client );
1084+
1085+ final String pattern = "pattern1" ;
1086+ final ClusterState localState = ClusterState .builder (new ClusterName ("local" ))
1087+ .metaData (MetaData .builder ()
1088+ .putCustom (AutoFollowMetadata .TYPE ,
1089+ new AutoFollowMetadata (Collections .singletonMap (pattern ,
1090+ new AutoFollowPattern ("remote" , Collections .singletonList ("docs-*" ), null , null , null , null , null , null , null , null ,
1091+ null , null , null )),
1092+ Collections .singletonMap (pattern , Collections .emptyList ()),
1093+ Collections .singletonMap (pattern , Collections .emptyMap ()))))
1094+ .build ();
1095+
1096+ ClusterState remoteState = null ;
1097+ final int nbLeaderIndices = randomIntBetween (1 , 15 );
1098+ for (int i = 0 ; i < nbLeaderIndices ; i ++) {
1099+ String indexName = "docs-" + i ;
1100+ if (remoteState == null ) {
1101+ remoteState = createRemoteClusterState (indexName , true );
1102+ } else {
1103+ remoteState = createRemoteClusterState (remoteState , indexName );
1104+ }
1105+ if (randomBoolean ()) {
1106+ // randomly close the index
1107+ remoteState = ClusterState .builder (remoteState .getClusterName ())
1108+ .routingTable (remoteState .routingTable ())
1109+ .metaData (MetaData .builder (remoteState .metaData ())
1110+ .put (IndexMetaData .builder (remoteState .metaData ().index (indexName )).state (IndexMetaData .State .CLOSE ).build (), true )
1111+ .build ())
1112+ .build ();
1113+ }
1114+ }
1115+
1116+ final ClusterState finalRemoteState = remoteState ;
1117+ final AtomicReference <ClusterState > lastModifiedClusterState = new AtomicReference <>(localState );
1118+ final List <AutoFollowCoordinator .AutoFollowResult > results = new ArrayList <>();
1119+ final Set <Object > followedIndices = ConcurrentCollections .newConcurrentSet ();
1120+ final AutoFollower autoFollower =
1121+ new AutoFollower ("remote" , results ::addAll , localClusterStateSupplier (localState ), () -> 1L , Runnable ::run ) {
1122+ @ Override
1123+ void getRemoteClusterState (String remoteCluster ,
1124+ long metadataVersion ,
1125+ BiConsumer <ClusterStateResponse , Exception > handler ) {
1126+ assertThat (remoteCluster , equalTo ("remote" ));
1127+ handler .accept (new ClusterStateResponse (new ClusterName ("remote" ), finalRemoteState , false ), null );
1128+ }
1129+
1130+ @ Override
1131+ void createAndFollow (Map <String , String > headers ,
1132+ PutFollowAction .Request followRequest ,
1133+ Runnable successHandler ,
1134+ Consumer <Exception > failureHandler ) {
1135+ followedIndices .add (followRequest .getLeaderIndex ());
1136+ successHandler .run ();
1137+ }
1138+
1139+ @ Override
1140+ void updateAutoFollowMetadata (Function <ClusterState , ClusterState > updateFunction , Consumer <Exception > handler ) {
1141+ lastModifiedClusterState .updateAndGet (updateFunction ::apply );
1142+ handler .accept (null );
1143+ }
1144+
1145+ @ Override
1146+ void cleanFollowedRemoteIndices (ClusterState remoteClusterState , List <String > patterns ) {
1147+ // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
1148+ }
1149+ };
1150+ autoFollower .start ();
1151+
1152+ assertThat (results , notNullValue ());
1153+ assertThat (results .size (), equalTo (1 ));
1154+
1155+ for (ObjectObjectCursor <String , IndexMetaData > index : remoteState .metaData ().indices ()) {
1156+ boolean expect = index .value .getState () == IndexMetaData .State .OPEN ;
1157+ assertThat (results .get (0 ).autoFollowExecutionResults .containsKey (index .value .getIndex ()), is (expect ));
1158+ assertThat (followedIndices .contains (index .key ), is (expect ));
1159+ }
1160+ }
1161+
10521162 private static ClusterState createRemoteClusterState (String indexName , Boolean enableSoftDeletes ) {
10531163 Settings .Builder indexSettings ;
10541164 if (enableSoftDeletes != null ) {
@@ -1075,19 +1185,21 @@ private static ClusterState createRemoteClusterState(String indexName, Boolean e
10751185
10761186 private static ClusterState createRemoteClusterState (ClusterState previous , String indexName ) {
10771187 IndexMetaData indexMetaData = IndexMetaData .builder (indexName )
1078- .settings (settings (Version .CURRENT ).put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true ))
1188+ .settings (settings (Version .CURRENT )
1189+ .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true )
1190+ .put (IndexMetaData .SETTING_INDEX_UUID , UUIDs .randomBase64UUID (random ())))
10791191 .numberOfShards (1 )
10801192 .numberOfReplicas (0 )
10811193 .build ();
1082- ClusterState .Builder csBuilder = ClusterState .builder (new ClusterName ( "remote" ))
1194+ ClusterState .Builder csBuilder = ClusterState .builder (previous . getClusterName ( ))
10831195 .metaData (MetaData .builder (previous .metaData ())
10841196 .version (previous .metaData ().version () + 1 )
10851197 .put (indexMetaData , true ));
10861198
10871199 ShardRouting shardRouting =
10881200 TestShardRouting .newShardRouting (indexName , 0 , "1" , true , ShardRoutingState .INITIALIZING ).moveToStarted ();
10891201 IndexRoutingTable indexRoutingTable = IndexRoutingTable .builder (indexMetaData .getIndex ()).addShard (shardRouting ).build ();
1090- csBuilder .routingTable (RoutingTable .builder ().add (indexRoutingTable ).build ()).build ();
1202+ csBuilder .routingTable (RoutingTable .builder (previous . routingTable () ).add (indexRoutingTable ).build ()).build ();
10911203
10921204 return csBuilder .build ();
10931205 }
0 commit comments