diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index 66a2eb358986a..71784a8e9ebfd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -126,8 +126,12 @@ public void start(Collection jobs) { public void stop() { scheduler.shutdownNow(); try { - scheduler.awaitTermination(5, TimeUnit.SECONDS); + final boolean terminated = scheduler.awaitTermination(5L, TimeUnit.SECONDS); + if (terminated == false) { + logger.warn("scheduler engine was not terminated after waiting 5s"); + } } catch (InterruptedException e) { + logger.warn("interrupted while waiting for scheduler engine termination"); Thread.currentThread().interrupt(); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 4ef39d630f06c..8ab0399eae5d3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -100,6 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu private final Settings settings; private final boolean transportClientMode; private final SetOnce dataFrameTransformsConfigManager = new SetOnce<>(); + private final SetOnce schedulerEngine = new SetOnce<>(); public DataFrame(Settings settings) { this.settings = settings; @@ -201,12 +202,12 @@ public List> getPersistentTasksExecutor(ClusterServic return emptyList(); } - SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC()); + schedulerEngine.set(new SchedulerEngine(settings, Clock.systemUTC())); // the transforms config manager should have been created assert dataFrameTransformsConfigManager.get() != null; - return Collections.singletonList( - new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), schedulerEngine, threadPool)); + return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), + schedulerEngine.get(), threadPool)); } @Override @@ -223,4 +224,11 @@ public List getNamedXContent() { DataFrameTransformState::fromXContent) ); } + + @Override + public void close() { + if (schedulerEngine.get() != null) { + schedulerEngine.get().stop(); + } + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 8b82f2684924d..bd8e885c3cb77 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -68,4 +68,4 @@ protected AllocatedPersistentTask createTask(long id, String type, String action return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index d143e80340cc0..34cdbb46a5c3a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.component.Lifecycle.State; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -53,8 +54,6 @@ public class IndexLifecycleService private final PolicyStepsRegistry policyRegistry; private final IndexLifecycleRunner lifecycleRunner; private final Settings settings; - private final ThreadPool threadPool; - private Client client; private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; @@ -63,13 +62,11 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { super(); this.settings = settings; - this.client = client; this.clusterService = clusterService; this.clock = clock; this.nowSupplier = nowSupplier; this.scheduledJob = null; this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client); - this.threadPool = threadPool; this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); clusterService.addStateApplier(this); @@ -158,14 +155,21 @@ SchedulerEngine.Job getScheduledJob() { return scheduledJob; } - private void maybeScheduleJob() { + private synchronized void maybeScheduleJob() { if (this.isMaster) { if (scheduler.get() == null) { - scheduler.set(new SchedulerEngine(settings, clock)); - scheduler.get().register(this); + // don't create scheduler if the node is shutting down + if (isClusterServiceStoppedOrClosed() == false) { + scheduler.set(new SchedulerEngine(settings, clock)); + scheduler.get().register(this); + } + } + + // scheduler could be null if the node might be shutting down + if (scheduler.get() != null) { + scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval)); + scheduler.get().add(scheduledJob); } - scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval)); - scheduler.get().add(scheduledJob); } } @@ -254,7 +258,11 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) } @Override - public void close() { + public synchronized void close() { + // this assertion is here to ensure that the check we use in maybeScheduleJob is accurate for detecting a shutdown in + // progress, which is that the cluster service is stopped and closed at some point prior to closing plugins + assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugin, which is expected to happen after " + + "the cluster service is stopped"; SchedulerEngine engine = scheduler.get(); if (engine != null) { engine.stop(); @@ -265,4 +273,13 @@ public void submitOperationModeUpdate(OperationMode mode) { clusterService.submitStateUpdateTask("ilm_operation_mode_update", new OperationModeUpdateTask(mode)); } + + /** + * Method that checks if the lifecycle state of the cluster service is stopped or closed. This + * enhances the readability of the code. + */ + private boolean isClusterServiceStoppedOrClosed() { + final State state = clusterService.lifecycleState(); + return state == State.STOPPED || state == State.CLOSED; + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index 810f913775e3b..67affe9f74ce0 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.component.Lifecycle.State; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -91,6 +92,7 @@ public void prepareServices() { Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build(); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, Collections.singleton(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING))); + when(clusterService.lifecycleState()).thenReturn(State.STARTED); Client client = mock(Client.class); AdminClient adminClient = mock(AdminClient.class); @@ -108,6 +110,7 @@ public void prepareServices() { @After public void cleanup() { + when(clusterService.lifecycleState()).thenReturn(randomFrom(State.STOPPED, State.CLOSED)); indexLifecycleService.close(); threadPool.shutdownNow(); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index ad409d4e2ca60..8ebbf1bccf864 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.rollup; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -104,6 +105,7 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")); + private final SetOnce schedulerEngine = new SetOnce<>(); private final Settings settings; private final boolean enabled; private final boolean transportClientMode; @@ -195,12 +197,19 @@ public List> getPersistentTasksExecutor(ClusterServic return emptyList(); } - SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock()); - return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine, threadPool)); + schedulerEngine.set(new SchedulerEngine(settings, getClock())); + return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine.get(), threadPool)); } // overridable by tests protected Clock getClock() { return Clock.systemUTC(); } + + @Override + public void close() { + if (schedulerEngine.get() != null) { + schedulerEngine.get().stop(); + } + } }