1818import org .elasticsearch .cluster .metadata .IndexMetaData ;
1919import org .elasticsearch .cluster .service .ClusterService ;
2020import org .elasticsearch .common .Strings ;
21+ import org .elasticsearch .common .component .Lifecycle .State ;
2122import org .elasticsearch .common .settings .Settings ;
2223import org .elasticsearch .common .unit .TimeValue ;
2324import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
@@ -53,8 +54,6 @@ public class IndexLifecycleService
5354 private final PolicyStepsRegistry policyRegistry ;
5455 private final IndexLifecycleRunner lifecycleRunner ;
5556 private final Settings settings ;
56- private final ThreadPool threadPool ;
57- private Client client ;
5857 private ClusterService clusterService ;
5958 private LongSupplier nowSupplier ;
6059 private SchedulerEngine .Job scheduledJob ;
@@ -63,13 +62,11 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl
6362 LongSupplier nowSupplier , NamedXContentRegistry xContentRegistry ) {
6463 super ();
6564 this .settings = settings ;
66- this .client = client ;
6765 this .clusterService = clusterService ;
6866 this .clock = clock ;
6967 this .nowSupplier = nowSupplier ;
7068 this .scheduledJob = null ;
7169 this .policyRegistry = new PolicyStepsRegistry (xContentRegistry , client );
72- this .threadPool = threadPool ;
7370 this .lifecycleRunner = new IndexLifecycleRunner (policyRegistry , clusterService , threadPool , nowSupplier );
7471 this .pollInterval = LifecycleSettings .LIFECYCLE_POLL_INTERVAL_SETTING .get (settings );
7572 clusterService .addStateApplier (this );
@@ -158,14 +155,21 @@ SchedulerEngine.Job getScheduledJob() {
158155 return scheduledJob ;
159156 }
160157
161- private void maybeScheduleJob () {
158+ private synchronized void maybeScheduleJob () {
162159 if (this .isMaster ) {
163160 if (scheduler .get () == null ) {
164- scheduler .set (new SchedulerEngine (settings , clock ));
165- scheduler .get ().register (this );
161+ // don't create scheduler if the node is shutting down
162+ if (isClusterServiceStoppedOrClosed () == false ) {
163+ scheduler .set (new SchedulerEngine (settings , clock ));
164+ scheduler .get ().register (this );
165+ }
166+ }
167+
168+ // scheduler could be null if the node might be shutting down
169+ if (scheduler .get () != null ) {
170+ scheduledJob = new SchedulerEngine .Job (XPackField .INDEX_LIFECYCLE , new TimeValueSchedule (pollInterval ));
171+ scheduler .get ().add (scheduledJob );
166172 }
167- scheduledJob = new SchedulerEngine .Job (XPackField .INDEX_LIFECYCLE , new TimeValueSchedule (pollInterval ));
168- scheduler .get ().add (scheduledJob );
169173 }
170174 }
171175
@@ -254,7 +258,11 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange)
254258 }
255259
256260 @ Override
257- public void close () {
261+ public synchronized void close () {
262+ // this assertion is here to ensure that the check we use in maybeScheduleJob is accurate for detecting a shutdown in
263+ // progress, which is that the cluster service is stopped and closed at some point prior to closing plugins
264+ assert isClusterServiceStoppedOrClosed () : "close is called by closing the plugin, which is expected to happen after " +
265+ "the cluster service is stopped" ;
258266 SchedulerEngine engine = scheduler .get ();
259267 if (engine != null ) {
260268 engine .stop ();
@@ -265,4 +273,13 @@ public void submitOperationModeUpdate(OperationMode mode) {
265273 clusterService .submitStateUpdateTask ("ilm_operation_mode_update" ,
266274 new OperationModeUpdateTask (mode ));
267275 }
276+
277+ /**
278+ * Method that checks if the lifecycle state of the cluster service is stopped or closed. This
279+ * enhances the readability of the code.
280+ */
281+ private boolean isClusterServiceStoppedOrClosed () {
282+ final State state = clusterService .lifecycleState ();
283+ return state == State .STOPPED || state == State .CLOSED ;
284+ }
268285}
0 commit comments