2525import org .elasticsearch .cluster .service .ClusterService ;
2626import org .elasticsearch .common .collect .CopyOnWriteHashMap ;
2727import org .elasticsearch .common .collect .Tuple ;
28+ import org .elasticsearch .common .component .AbstractLifecycleComponent ;
29+ import org .elasticsearch .common .component .Lifecycle ;
2830import org .elasticsearch .common .settings .Settings ;
2931import org .elasticsearch .common .unit .TimeValue ;
3032import org .elasticsearch .common .util .concurrent .AtomicArray ;
6567 * A component that runs only on the elected master node and follows leader indices automatically
6668 * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
6769 */
68- public class AutoFollowCoordinator implements ClusterStateListener {
70+ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ClusterStateListener {
6971
7072 private static final Logger LOGGER = LogManager .getLogger (AutoFollowCoordinator .class );
7173 private static final int MAX_AUTO_FOLLOW_ERRORS = 256 ;
@@ -90,7 +92,7 @@ public AutoFollowCoordinator(
9092 ClusterService clusterService ,
9193 CcrLicenseChecker ccrLicenseChecker ,
9294 LongSupplier relativeMillisTimeProvider ) {
93-
95+ super ( settings );
9496 this .client = client ;
9597 this .clusterService = clusterService ;
9698 this .ccrLicenseChecker = Objects .requireNonNull (ccrLicenseChecker , "ccrLicenseChecker" );
@@ -113,6 +115,26 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
113115 waitForMetadataTimeOut = CcrSettings .CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT .get (settings );
114116 }
115117
118+ @ Override
119+ protected void doStart () {
120+
121+ }
122+
123+ @ Override
124+ protected void doStop () {
125+ LOGGER .trace ("stopping all auto-followers" );
126+ /*
127+ * Synchronization is not necessary here; the field is volatile and the map is a copy-on-write map, any new auto-followers will not
128+ * start since we check started status of the coordinator before starting them.
129+ */
130+ autoFollowers .values ().forEach (AutoFollower ::stop );
131+ }
132+
133+ @ Override
134+ protected void doClose () {
135+
136+ }
137+
116138 public synchronized AutoFollowStats getStats () {
117139 final Map <String , AutoFollower > autoFollowers = this .autoFollowers ;
118140 final TreeMap <String , AutoFollowedCluster > timesSinceLastAutoFollowPerRemoteCluster = new TreeMap <>();
@@ -241,8 +263,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
241263
242264 };
243265 newAutoFollowers .put (remoteCluster , autoFollower );
244- LOGGER .info ("starting auto follower for remote cluster [{}]" , remoteCluster );
245- autoFollower .start ();
266+ LOGGER .info ("starting auto-follower for remote cluster [{}]" , remoteCluster );
267+ if (lifecycleState () == Lifecycle .State .STARTED ) {
268+ autoFollower .start ();
269+ }
246270 }
247271
248272 List <String > removedRemoteClusters = new ArrayList <>();
@@ -252,13 +276,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
252276 boolean exist = autoFollowMetadata .getPatterns ().values ().stream ()
253277 .anyMatch (pattern -> pattern .getRemoteCluster ().equals (remoteCluster ));
254278 if (exist == false ) {
255- LOGGER .info ("removing auto follower for remote cluster [{}]" , remoteCluster );
279+ LOGGER .info ("removing auto- follower for remote cluster [{}]" , remoteCluster );
256280 autoFollower .removed = true ;
257281 removedRemoteClusters .add (remoteCluster );
258282 } else if (autoFollower .remoteClusterConnectionMissing ) {
259- LOGGER .info ("retrying auto follower [{}] after remote cluster connection was missing" , remoteCluster );
283+ LOGGER .info ("retrying auto- follower for remote cluster [{}] after remote cluster connection was missing" , remoteCluster );
260284 autoFollower .remoteClusterConnectionMissing = false ;
261- autoFollower .start ();
285+ if (lifecycleState () == Lifecycle .State .STARTED ) {
286+ autoFollower .start ();
287+ }
262288 }
263289 }
264290 assert assertNoOtherActiveAutoFollower (newAutoFollowers );
@@ -308,6 +334,7 @@ abstract static class AutoFollower {
308334 volatile boolean removed = false ;
309335 private volatile CountDown autoFollowPatternsCountDown ;
310336 private volatile AtomicArray <AutoFollowResult > autoFollowResults ;
337+ private volatile boolean stop ;
311338
312339 AutoFollower (final String remoteCluster ,
313340 final Consumer <List <AutoFollowResult >> statsUpdater ,
@@ -320,6 +347,10 @@ abstract static class AutoFollower {
320347 }
321348
322349 void start () {
350+ if (stop ) {
351+ LOGGER .trace ("auto-follower is stopped for remote cluster [{}]" , remoteCluster );
352+ return ;
353+ }
323354 if (removed ) {
324355 // This check exists to avoid two AutoFollower instances a single remote cluster.
325356 // (If an auto follow pattern is deleted and then added back quickly enough then
@@ -385,6 +416,11 @@ void start() {
385416 });
386417 }
387418
419+ void stop () {
420+ LOGGER .trace ("stopping auto-follower for remote cluster [{}]" , remoteCluster );
421+ stop = true ;
422+ }
423+
388424 private void autoFollowIndices (final AutoFollowMetadata autoFollowMetadata ,
389425 final ClusterState clusterState ,
390426 final ClusterState remoteClusterState ,
0 commit comments