2525import org .elasticsearch .cluster .service .ClusterService ;
2626import org .elasticsearch .common .collect .CopyOnWriteHashMap ;
2727import org .elasticsearch .common .collect .Tuple ;
28+ import org .elasticsearch .common .component .AbstractLifecycleComponent ;
29+ import org .elasticsearch .common .component .Lifecycle ;
2830import org .elasticsearch .common .settings .Settings ;
2931import org .elasticsearch .common .unit .TimeValue ;
3032import org .elasticsearch .common .util .concurrent .AtomicArray ;
6567 * A component that runs only on the elected master node and follows leader indices automatically
6668 * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
6769 */
68- public class AutoFollowCoordinator implements ClusterStateListener {
70+ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ClusterStateListener {
6971
7072 private static final Logger LOGGER = LogManager .getLogger (AutoFollowCoordinator .class );
7173 private static final int MAX_AUTO_FOLLOW_ERRORS = 256 ;
@@ -116,6 +118,26 @@ protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, Elastics
116118 waitForMetadataTimeOut = CcrSettings .CCR_WAIT_FOR_METADATA_TIMEOUT .get (settings );
117119 }
118120
121+ @ Override
122+ protected void doStart () {
123+
124+ }
125+
126+ @ Override
127+ protected void doStop () {
128+ LOGGER .trace ("stopping all auto-followers" );
129+ /*
130+ * Synchronization is not necessary here; the field is volatile and the map is a copy-on-write map, any new auto-followers will not
131+ * start since we check started status of the coordinator before starting them.
132+ */
133+ autoFollowers .values ().forEach (AutoFollower ::stop );
134+ }
135+
136+ @ Override
137+ protected void doClose () {
138+
139+ }
140+
119141 public synchronized AutoFollowStats getStats () {
120142 final Map <String , AutoFollower > autoFollowers = this .autoFollowers ;
121143 final TreeMap <String , AutoFollowedCluster > timesSinceLastAutoFollowPerRemoteCluster = new TreeMap <>();
@@ -245,8 +267,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
245267
246268 };
247269 newAutoFollowers .put (remoteCluster , autoFollower );
248- LOGGER .info ("starting auto follower for remote cluster [{}]" , remoteCluster );
249- autoFollower .start ();
270+ LOGGER .info ("starting auto-follower for remote cluster [{}]" , remoteCluster );
271+ if (lifecycleState () == Lifecycle .State .STARTED ) {
272+ autoFollower .start ();
273+ }
250274 }
251275
252276 List <String > removedRemoteClusters = new ArrayList <>();
@@ -256,13 +280,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
256280 boolean exist = autoFollowMetadata .getPatterns ().values ().stream ()
257281 .anyMatch (pattern -> pattern .getRemoteCluster ().equals (remoteCluster ));
258282 if (exist == false ) {
259- LOGGER .info ("removing auto follower for remote cluster [{}]" , remoteCluster );
283+ LOGGER .info ("removing auto- follower for remote cluster [{}]" , remoteCluster );
260284 autoFollower .removed = true ;
261285 removedRemoteClusters .add (remoteCluster );
262286 } else if (autoFollower .remoteClusterConnectionMissing ) {
263- LOGGER .info ("retrying auto follower [{}] after remote cluster connection was missing" , remoteCluster );
287+ LOGGER .info ("retrying auto- follower for remote cluster [{}] after remote cluster connection was missing" , remoteCluster );
264288 autoFollower .remoteClusterConnectionMissing = false ;
265- autoFollower .start ();
289+ if (lifecycleState () == Lifecycle .State .STARTED ) {
290+ autoFollower .start ();
291+ }
266292 }
267293 }
268294 assert assertNoOtherActiveAutoFollower (newAutoFollowers );
@@ -312,6 +338,7 @@ abstract static class AutoFollower {
312338 volatile boolean removed = false ;
313339 private volatile CountDown autoFollowPatternsCountDown ;
314340 private volatile AtomicArray <AutoFollowResult > autoFollowResults ;
341+ private volatile boolean stop ;
315342
316343 AutoFollower (final String remoteCluster ,
317344 final Consumer <List <AutoFollowResult >> statsUpdater ,
@@ -324,6 +351,10 @@ abstract static class AutoFollower {
324351 }
325352
326353 void start () {
354+ if (stop ) {
355+ LOGGER .trace ("auto-follower is stopped for remote cluster [{}]" , remoteCluster );
356+ return ;
357+ }
327358 if (removed ) {
328359 // This check exists to avoid two AutoFollower instances a single remote cluster.
329360 // (If an auto follow pattern is deleted and then added back quickly enough then
@@ -388,6 +419,11 @@ void start() {
388419 });
389420 }
390421
422+ void stop () {
423+ LOGGER .trace ("stopping auto-follower for remote cluster [{}]" , remoteCluster );
424+ stop = true ;
425+ }
426+
391427 private void autoFollowIndices (final AutoFollowMetadata autoFollowMetadata ,
392428 final ClusterState clusterState ,
393429 final ClusterState remoteClusterState ,
0 commit comments