2626import org .elasticsearch .cluster .service .ClusterService ;
2727import org .elasticsearch .common .collect .CopyOnWriteHashMap ;
2828import org .elasticsearch .common .collect .Tuple ;
29+ import org .elasticsearch .common .component .AbstractLifecycleComponent ;
30+ import org .elasticsearch .common .component .Lifecycle ;
2931import org .elasticsearch .common .settings .Settings ;
3032import org .elasticsearch .common .unit .TimeValue ;
3133import org .elasticsearch .common .util .concurrent .AtomicArray ;
6668 * A component that runs only on the elected master node and follows leader indices automatically
6769 * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
6870 */
69- public class AutoFollowCoordinator implements ClusterStateListener {
71+ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements ClusterStateListener {
7072
7173 private static final Logger LOGGER = LogManager .getLogger (AutoFollowCoordinator .class );
7274 private static final int MAX_AUTO_FOLLOW_ERRORS = 256 ;
@@ -117,6 +119,26 @@ protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, Elastics
117119 waitForMetadataTimeOut = CcrSettings .CCR_WAIT_FOR_METADATA_TIMEOUT .get (settings );
118120 }
119121
122+ @ Override
123+ protected void doStart () {
124+
125+ }
126+
127+ @ Override
128+ protected void doStop () {
129+ LOGGER .trace ("stopping all auto-followers" );
130+ /*
131+ * Synchronization is not necessary here; the field is volatile and the map is a copy-on-write map, any new auto-followers will not
132+ * start since we check started status of the coordinator before starting them.
133+ */
134+ autoFollowers .values ().forEach (AutoFollower ::stop );
135+ }
136+
137+ @ Override
138+ protected void doClose () {
139+
140+ }
141+
120142 public synchronized AutoFollowStats getStats () {
121143 final Map <String , AutoFollower > autoFollowers = this .autoFollowers ;
122144 final TreeMap <String , AutoFollowedCluster > timesSinceLastAutoFollowPerRemoteCluster = new TreeMap <>();
@@ -246,8 +268,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
246268
247269 };
248270 newAutoFollowers .put (remoteCluster , autoFollower );
249- LOGGER .info ("starting auto follower for remote cluster [{}]" , remoteCluster );
250- autoFollower .start ();
271+ LOGGER .info ("starting auto-follower for remote cluster [{}]" , remoteCluster );
272+ if (lifecycleState () == Lifecycle .State .STARTED ) {
273+ autoFollower .start ();
274+ }
251275 }
252276
253277 List <String > removedRemoteClusters = new ArrayList <>();
@@ -257,13 +281,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
257281 boolean exist = autoFollowMetadata .getPatterns ().values ().stream ()
258282 .anyMatch (pattern -> pattern .getRemoteCluster ().equals (remoteCluster ));
259283 if (exist == false ) {
260- LOGGER .info ("removing auto follower for remote cluster [{}]" , remoteCluster );
284+ LOGGER .info ("removing auto- follower for remote cluster [{}]" , remoteCluster );
261285 autoFollower .removed = true ;
262286 removedRemoteClusters .add (remoteCluster );
263287 } else if (autoFollower .remoteClusterConnectionMissing ) {
264- LOGGER .info ("retrying auto follower [{}] after remote cluster connection was missing" , remoteCluster );
288+ LOGGER .info ("retrying auto- follower for remote cluster [{}] after remote cluster connection was missing" , remoteCluster );
265289 autoFollower .remoteClusterConnectionMissing = false ;
266- autoFollower .start ();
290+ if (lifecycleState () == Lifecycle .State .STARTED ) {
291+ autoFollower .start ();
292+ }
267293 }
268294 }
269295 assert assertNoOtherActiveAutoFollower (newAutoFollowers );
@@ -313,6 +339,7 @@ abstract static class AutoFollower {
313339 volatile boolean removed = false ;
314340 private volatile CountDown autoFollowPatternsCountDown ;
315341 private volatile AtomicArray <AutoFollowResult > autoFollowResults ;
342+ private volatile boolean stop ;
316343
317344 AutoFollower (final String remoteCluster ,
318345 final Consumer <List <AutoFollowResult >> statsUpdater ,
@@ -325,6 +352,10 @@ abstract static class AutoFollower {
325352 }
326353
327354 void start () {
355+ if (stop ) {
356+ LOGGER .trace ("auto-follower is stopped for remote cluster [{}]" , remoteCluster );
357+ return ;
358+ }
328359 if (removed ) {
329360 // This check exists to avoid two AutoFollower instances a single remote cluster.
330361 // (If an auto follow pattern is deleted and then added back quickly enough then
@@ -389,6 +420,11 @@ void start() {
389420 });
390421 }
391422
423+ void stop () {
424+ LOGGER .trace ("stopping auto-follower for remote cluster [{}]" , remoteCluster );
425+ stop = true ;
426+ }
427+
392428 private void autoFollowIndices (final AutoFollowMetadata autoFollowMetadata ,
393429 final ClusterState clusterState ,
394430 final ClusterState remoteClusterState ,
0 commit comments