From d8d3153dc690eb3c34cf0f1564d5965fdb473e05 Mon Sep 17 00:00:00 2001 From: jaymode Date: Wed, 20 Feb 2019 10:35:20 -0700 Subject: [PATCH 1/6] Fixed missed stopping of SchedulerEngine The SchedulerEngine is used in several places in our code and not all of these usages properly stopped the SchedulerEngine, which could lead to test failures due to leaked threads from the SchedulerEngine. This change adds stopping to these usages in order to avoid the thread leaks that cause CI failures and noise. Closes #38875 --- .../persistent/PersistentTasksExecutor.java | 3 ++- .../PersistentTasksExecutorRegistry.java | 11 +++++++++- .../PersistentTasksNodeService.java | 8 +++++++- .../persistent/StartPersistentTaskAction.java | 20 +++++++++++++++++-- .../PersistentTasksClusterServiceTests.java | 4 ++++ .../PersistentTasksDecidersTestCase.java | 4 ++++ .../persistent/TestPersistentTasksPlugin.java | 4 ++++ .../ccr/action/ShardFollowTasksExecutor.java | 3 +++ ...FrameTransformPersistentTasksExecutor.java | 7 ++++++- .../xpack/indexlifecycle/IndexLifecycle.java | 7 +++++++ .../indexlifecycle/IndexLifecycleService.java | 12 +++++------ .../ml/action/TransportOpenJobAction.java | 4 ++++ .../action/TransportStartDatafeedAction.java | 4 ++++ .../xpack/rollup/job/RollupJobTask.java | 5 +++++ 14 files changed, 83 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index ab674a79c4e52..8658cea0081e9 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -26,6 +26,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.TaskId; +import java.io.Closeable; import java.util.Map; import java.util.function.Predicate; @@ -33,7 +34,7 @@ * An executor of tasks that can survive restart of requesting or executing node. * These tasks are using cluster state rather than only transport service to send requests and responses. */ -public abstract class PersistentTasksExecutor { +public abstract class PersistentTasksExecutor implements Closeable { private final String executor; private final String taskName; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java index 6d26431bc9541..18732af9fd523 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java @@ -18,6 +18,10 @@ */ package org.elasticsearch.persistent; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -26,7 +30,7 @@ /** * Components that registers all persistent task executors */ -public class PersistentTasksExecutorRegistry { +public class PersistentTasksExecutorRegistry implements Closeable { private final Map> taskExecutors; @@ -46,4 +50,9 @@ public PersistentTasksExecutor get } return executor; } + + @Override + public void close() throws IOException { + IOUtils.close(taskExecutors.values()); + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 260fabc67cdca..99e0853ec646b 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -36,6 +36,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -49,7 +50,7 @@ * This component is responsible for coordination of execution of persistent tasks on individual nodes. It runs on all * non-transport client nodes in the cluster and monitors cluster state changes to detect started commands. */ -public class PersistentTasksNodeService implements ClusterStateListener { +public class PersistentTasksNodeService implements ClusterStateListener, Closeable { private static final Logger logger = LogManager.getLogger(PersistentTasksNodeService.class); @@ -215,6 +216,11 @@ public void onFailure(Exception e) { } } + @Override + public void close() throws IOException { + persistentTasksExecutorRegistry.close(); + } + public static class Status implements Task.Status { public static final String NAME = "persistent_executor"; diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 2911bcb4f6376..8964cb4399bde 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.persistent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -33,6 +35,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -189,6 +192,8 @@ public RequestBuilder setRequest(PersistentTaskParams params) { public static class TransportAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportAction.class); + private final PersistentTasksClusterService persistentTasksClusterService; @Inject @@ -202,8 +207,19 @@ public TransportAction(TransportService transportService, ClusterService cluster indexNameExpressionResolver, Request::new); this.persistentTasksClusterService = persistentTasksClusterService; NodePersistentTasksExecutor executor = new NodePersistentTasksExecutor(threadPool); - clusterService.addListener(new PersistentTasksNodeService(persistentTasksService, persistentTasksExecutorRegistry, - transportService.getTaskManager(), executor)); + final PersistentTasksNodeService nodeService = new PersistentTasksNodeService(persistentTasksService, + persistentTasksExecutorRegistry, transportService.getTaskManager(), executor); + clusterService.addListener(nodeService); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void beforeStop() { + try { + nodeService.close(); + } catch (IOException e) { + logger.warn("caught exception while closing the persistent tasks node service", e); + } + } + }); } @Override diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index c3863e8ec08e4..7c2482055598d 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -844,6 +844,10 @@ public Assignment getAssignment(P params, ClusterState clusterState) { protected void nodeOperation(AllocatedPersistentTask task, P params, PersistentTaskState state) { throw new UnsupportedOperationException(); } + + @Override + public void close() { + } })); return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService, threadPool); } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index 2007c350b555b..52c7816525c59 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -66,6 +66,10 @@ public PersistentTasksExecutor get protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) { logger.debug("Executing task {}", task); } + + @Override + public void close() { + } }; } }; diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 151129c0cc1fa..f11d7bb680580 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -394,6 +394,10 @@ protected AllocatedPersistentTask createTask(long id, String type, String action PersistentTask task, Map headers) { return new TestTask(id, type, action, getDescription(task), parentTaskId, headers); } + + @Override + public void close() { + } } public static class TestTaskAction extends Action { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 46b3c6e54f576..f3226536aab88 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -337,4 +337,7 @@ private void fetchFollowerShardInfo( }, errorHandler)); } + @Override + public void close() { + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 8b82f2684924d..1a7face640a35 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -68,4 +68,9 @@ protected AllocatedPersistentTask createTask(long id, String type, String action return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers); } -} \ No newline at end of file + + @Override + public void close() { + schedulerEngine.stop(); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 2e7d2fbbc555d..4017ef3e80d77 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; @@ -137,6 +138,12 @@ public Collection createComponents(Client client, ClusterService cluster } indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry)); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void beforeStop() { + indexLifecycleInitialisationService.get().close(); + } + }); return Collections.singletonList(indexLifecycleInitialisationService.get()); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index d143e80340cc0..94daf039a84bb 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -53,23 +53,20 @@ public class IndexLifecycleService private final PolicyStepsRegistry policyRegistry; private final IndexLifecycleRunner lifecycleRunner; private final Settings settings; - private final ThreadPool threadPool; - private Client client; private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; + private boolean closed = false; public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock, LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { super(); this.settings = settings; - this.client = client; this.clusterService = clusterService; this.clock = clock; this.nowSupplier = nowSupplier; this.scheduledJob = null; this.policyRegistry = new PolicyStepsRegistry(xContentRegistry, client); - this.threadPool = threadPool; this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, threadPool, nowSupplier); this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); clusterService.addStateApplier(this); @@ -158,8 +155,8 @@ SchedulerEngine.Job getScheduledJob() { return scheduledJob; } - private void maybeScheduleJob() { - if (this.isMaster) { + private synchronized void maybeScheduleJob() { + if (this.isMaster && closed == false) { if (scheduler.get() == null) { scheduler.set(new SchedulerEngine(settings, clock)); scheduler.get().register(this); @@ -254,7 +251,8 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) } @Override - public void close() { + public synchronized void close() { + closed = true; SchedulerEngine engine = scheduler.get(); if (engine != null) { engine.stop(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index ac4f435da130d..ad708d00da852 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -671,6 +671,10 @@ protected AllocatedPersistentTask createTask(long id, String type, String action return new JobTask(persistentTask.getParams().getJobId(), id, type, action, parentTaskId, headers); } + @Override + public void close() { + } + void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) { logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), this.maxConcurrentJobAllocations, maxConcurrentJobAllocations); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index f81fcfbfb1d2e..3374199c7387e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -375,6 +375,10 @@ protected AllocatedPersistentTask createTask( Map headers) { return new DatafeedTask(id, type, action, parentTaskId, persistentTask.getParams(), headers); } + + @Override + public void close() { + } } public static class DatafeedTask extends AllocatedPersistentTask implements StartDatafeedAction.DatafeedTaskMatcher { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index f545ab049d44d..f41fb62579be9 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -83,6 +83,11 @@ protected AllocatedPersistentTask createTask(long id, String type, String action return new RollupJobTask(id, type, action, parentTaskId, persistentTask.getParams(), (RollupJobStatus) persistentTask.getState(), client, schedulerEngine, threadPool, headers); } + + @Override + public void close() { + schedulerEngine.stop(); + } } /** From 8e657b614338b8fccec6d4e6210c79d38b0ba6a7 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 21 Feb 2019 08:25:08 -0700 Subject: [PATCH 2/6] address feedback regarding ILM --- .../xpack/core/scheduler/SchedulerEngine.java | 12 ++++++++++-- .../xpack/indexlifecycle/IndexLifecycle.java | 9 +-------- .../xpack/indexlifecycle/IndexLifecycleService.java | 13 ++++++------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index 66a2eb358986a..e92d2b95466d4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -123,15 +123,23 @@ public void start(Collection jobs) { jobs.forEach(this::add); } - public void stop() { + public synchronized void stop() { scheduler.shutdownNow(); try { - scheduler.awaitTermination(5, TimeUnit.SECONDS); + final boolean terminated = scheduler.awaitTermination(5L, TimeUnit.SECONDS); + if (terminated == false) { + logger.warn("scheduler engine was not terminated after waiting 5s"); + } } catch (InterruptedException e) { + logger.warn("interrupted while waiting for scheduler engine termination"); Thread.currentThread().interrupt(); } } + public synchronized boolean isStopped() { + return scheduler.isShutdown(); + } + public void add(Job job) { ActiveSchedule schedule = new ActiveSchedule(job.getId(), job.getSchedule(), clock.millis()); schedules.compute(schedule.name, (name, previousSchedule) -> { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 4017ef3e80d77..35c24febb88d1 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; @@ -43,9 +42,9 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; -import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction; import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; +import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; @@ -138,12 +137,6 @@ public Collection createComponents(Client client, ClusterService cluster } indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool, getClock(), System::currentTimeMillis, xContentRegistry)); - clusterService.addLifecycleListener(new LifecycleListener() { - @Override - public void beforeStop() { - indexLifecycleInitialisationService.get().close(); - } - }); return Collections.singletonList(indexLifecycleInitialisationService.get()); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 94daf039a84bb..8471ef9c07452 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -56,7 +56,6 @@ public class IndexLifecycleService private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; - private boolean closed = false; public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock, LongSupplier nowSupplier, NamedXContentRegistry xContentRegistry) { @@ -155,14 +154,15 @@ SchedulerEngine.Job getScheduledJob() { return scheduledJob; } - private synchronized void maybeScheduleJob() { - if (this.isMaster && closed == false) { + private void maybeScheduleJob() { + if (this.isMaster) { if (scheduler.get() == null) { scheduler.set(new SchedulerEngine(settings, clock)); scheduler.get().register(this); + } else if (scheduler.get().isStopped() == false) { + scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval)); + scheduler.get().add(scheduledJob); } - scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval)); - scheduler.get().add(scheduledJob); } } @@ -251,8 +251,7 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) } @Override - public synchronized void close() { - closed = true; + public void close() { SchedulerEngine engine = scheduler.get(); if (engine != null) { engine.stop(); From b85668e1ceb46b981a89fec4d39792d372ded757 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 21 Feb 2019 09:08:37 -0700 Subject: [PATCH 3/6] simplify change to using plugin close --- .../persistent/PersistentTasksExecutor.java | 3 +-- .../PersistentTasksExecutorRegistry.java | 11 +--------- .../PersistentTasksNodeService.java | 10 ++-------- .../persistent/StartPersistentTaskAction.java | 20 ++----------------- .../PersistentTasksClusterServiceTests.java | 4 ---- .../PersistentTasksDecidersTestCase.java | 4 ---- .../persistent/TestPersistentTasksPlugin.java | 4 ---- .../ccr/action/ShardFollowTasksExecutor.java | 4 ---- .../xpack/dataframe/DataFrame.java | 14 ++++++++++--- ...FrameTransformPersistentTasksExecutor.java | 5 ----- .../ml/action/TransportOpenJobAction.java | 4 ---- .../action/TransportStartDatafeedAction.java | 4 ---- .../elasticsearch/xpack/rollup/Rollup.java | 13 ++++++++++-- .../xpack/rollup/job/RollupJobTask.java | 5 ----- 14 files changed, 28 insertions(+), 77 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index 8658cea0081e9..ab674a79c4e52 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -26,7 +26,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.TaskId; -import java.io.Closeable; import java.util.Map; import java.util.function.Predicate; @@ -34,7 +33,7 @@ * An executor of tasks that can survive restart of requesting or executing node. * These tasks are using cluster state rather than only transport service to send requests and responses. */ -public abstract class PersistentTasksExecutor implements Closeable { +public abstract class PersistentTasksExecutor { private final String executor; private final String taskName; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java index 18732af9fd523..6d26431bc9541 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java @@ -18,10 +18,6 @@ */ package org.elasticsearch.persistent; -import org.elasticsearch.core.internal.io.IOUtils; - -import java.io.Closeable; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -30,7 +26,7 @@ /** * Components that registers all persistent task executors */ -public class PersistentTasksExecutorRegistry implements Closeable { +public class PersistentTasksExecutorRegistry { private final Map> taskExecutors; @@ -50,9 +46,4 @@ public PersistentTasksExecutor get } return executor; } - - @Override - public void close() throws IOException { - IOUtils.close(taskExecutors.values()); - } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 99e0853ec646b..04c5ebd51d02b 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -30,13 +30,12 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -50,7 +49,7 @@ * This component is responsible for coordination of execution of persistent tasks on individual nodes. It runs on all * non-transport client nodes in the cluster and monitors cluster state changes to detect started commands. */ -public class PersistentTasksNodeService implements ClusterStateListener, Closeable { +public class PersistentTasksNodeService implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(PersistentTasksNodeService.class); @@ -216,11 +215,6 @@ public void onFailure(Exception e) { } } - @Override - public void close() throws IOException { - persistentTasksExecutorRegistry.close(); - } - public static class Status implements Task.Status { public static final String NAME = "persistent_executor"; diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 8964cb4399bde..9838c1b4d0a36 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.persistent; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; @@ -35,7 +33,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -192,8 +189,6 @@ public RequestBuilder setRequest(PersistentTaskParams params) { public static class TransportAction extends TransportMasterNodeAction { - private static final Logger logger = LogManager.getLogger(TransportAction.class); - private final PersistentTasksClusterService persistentTasksClusterService; @Inject @@ -207,19 +202,8 @@ public TransportAction(TransportService transportService, ClusterService cluster indexNameExpressionResolver, Request::new); this.persistentTasksClusterService = persistentTasksClusterService; NodePersistentTasksExecutor executor = new NodePersistentTasksExecutor(threadPool); - final PersistentTasksNodeService nodeService = new PersistentTasksNodeService(persistentTasksService, - persistentTasksExecutorRegistry, transportService.getTaskManager(), executor); - clusterService.addListener(nodeService); - clusterService.addLifecycleListener(new LifecycleListener() { - @Override - public void beforeStop() { - try { - nodeService.close(); - } catch (IOException e) { - logger.warn("caught exception while closing the persistent tasks node service", e); - } - } - }); + clusterService.addListener(new PersistentTasksNodeService(persistentTasksService, persistentTasksExecutorRegistry, + transportService.getTaskManager(), executor)); } @Override diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 7c2482055598d..c3863e8ec08e4 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -844,10 +844,6 @@ public Assignment getAssignment(P params, ClusterState clusterState) { protected void nodeOperation(AllocatedPersistentTask task, P params, PersistentTaskState state) { throw new UnsupportedOperationException(); } - - @Override - public void close() { - } })); return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService, threadPool); } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index 52c7816525c59..2007c350b555b 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -66,10 +66,6 @@ public PersistentTasksExecutor get protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) { logger.debug("Executing task {}", task); } - - @Override - public void close() { - } }; } }; diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index f11d7bb680580..151129c0cc1fa 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -394,10 +394,6 @@ protected AllocatedPersistentTask createTask(long id, String type, String action PersistentTask task, Map headers) { return new TestTask(id, type, action, getDescription(task), parentTaskId, headers); } - - @Override - public void close() { - } } public static class TestTaskAction extends Action { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index f3226536aab88..8e4b31e60db9e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -336,8 +336,4 @@ private void fetchFollowerShardInfo( } }, errorHandler)); } - - @Override - public void close() { - } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 4ef39d630f06c..8ab0399eae5d3 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -100,6 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu private final Settings settings; private final boolean transportClientMode; private final SetOnce dataFrameTransformsConfigManager = new SetOnce<>(); + private final SetOnce schedulerEngine = new SetOnce<>(); public DataFrame(Settings settings) { this.settings = settings; @@ -201,12 +202,12 @@ public List> getPersistentTasksExecutor(ClusterServic return emptyList(); } - SchedulerEngine schedulerEngine = new SchedulerEngine(settings, Clock.systemUTC()); + schedulerEngine.set(new SchedulerEngine(settings, Clock.systemUTC())); // the transforms config manager should have been created assert dataFrameTransformsConfigManager.get() != null; - return Collections.singletonList( - new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), schedulerEngine, threadPool)); + return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), + schedulerEngine.get(), threadPool)); } @Override @@ -223,4 +224,11 @@ public List getNamedXContent() { DataFrameTransformState::fromXContent) ); } + + @Override + public void close() { + if (schedulerEngine.get() != null) { + schedulerEngine.get().stop(); + } + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 1a7face640a35..bd8e885c3cb77 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -68,9 +68,4 @@ protected AllocatedPersistentTask createTask(long id, String type, String action return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, schedulerEngine, threadPool, headers); } - - @Override - public void close() { - schedulerEngine.stop(); - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index ad708d00da852..ac4f435da130d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -671,10 +671,6 @@ protected AllocatedPersistentTask createTask(long id, String type, String action return new JobTask(persistentTask.getParams().getJobId(), id, type, action, parentTaskId, headers); } - @Override - public void close() { - } - void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) { logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), this.maxConcurrentJobAllocations, maxConcurrentJobAllocations); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3374199c7387e..f81fcfbfb1d2e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -375,10 +375,6 @@ protected AllocatedPersistentTask createTask( Map headers) { return new DatafeedTask(id, type, action, parentTaskId, persistentTask.getParams(), headers); } - - @Override - public void close() { - } } public static class DatafeedTask extends AllocatedPersistentTask implements StartDatafeedAction.DatafeedTaskMatcher { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index ad409d4e2ca60..8ebbf1bccf864 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.rollup; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -104,6 +105,7 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin new HashSet<>(Arrays.asList("es-security-runas-user", "_xpack_security_authentication")); + private final SetOnce schedulerEngine = new SetOnce<>(); private final Settings settings; private final boolean enabled; private final boolean transportClientMode; @@ -195,12 +197,19 @@ public List> getPersistentTasksExecutor(ClusterServic return emptyList(); } - SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock()); - return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine, threadPool)); + schedulerEngine.set(new SchedulerEngine(settings, getClock())); + return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(client, schedulerEngine.get(), threadPool)); } // overridable by tests protected Clock getClock() { return Clock.systemUTC(); } + + @Override + public void close() { + if (schedulerEngine.get() != null) { + schedulerEngine.get().stop(); + } + } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index f41fb62579be9..f545ab049d44d 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -83,11 +83,6 @@ protected AllocatedPersistentTask createTask(long id, String type, String action return new RollupJobTask(id, type, action, parentTaskId, persistentTask.getParams(), (RollupJobStatus) persistentTask.getState(), client, schedulerEngine, threadPool, headers); } - - @Override - public void close() { - schedulerEngine.stop(); - } } /** From c35108513b85b11daa8f014c755692ab4c8b8963 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 21 Feb 2019 09:13:06 -0700 Subject: [PATCH 4/6] remove unnecessary changes --- .../elasticsearch/persistent/PersistentTasksNodeService.java | 2 +- .../org/elasticsearch/persistent/StartPersistentTaskAction.java | 2 +- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 1 + .../org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 04c5ebd51d02b..260fabc67cdca 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -30,11 +30,11 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.HashMap; diff --git a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java index 9838c1b4d0a36..2911bcb4f6376 100644 --- a/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/StartPersistentTaskAction.java @@ -203,7 +203,7 @@ public TransportAction(TransportService transportService, ClusterService cluster this.persistentTasksClusterService = persistentTasksClusterService; NodePersistentTasksExecutor executor = new NodePersistentTasksExecutor(threadPool); clusterService.addListener(new PersistentTasksNodeService(persistentTasksService, persistentTasksExecutorRegistry, - transportService.getTaskManager(), executor)); + transportService.getTaskManager(), executor)); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 8e4b31e60db9e..46b3c6e54f576 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -336,4 +336,5 @@ private void fetchFollowerShardInfo( } }, errorHandler)); } + } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 35c24febb88d1..2e7d2fbbc555d 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -42,9 +42,9 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction; import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; -import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; From 2571be5d9b2e803fac26b3e54554792fd6aaf183 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 21 Feb 2019 09:33:20 -0700 Subject: [PATCH 5/6] fix incorrect change --- .../xpack/core/scheduler/SchedulerEngine.java | 6 +---- .../indexlifecycle/IndexLifecycleService.java | 22 ++++++++++++++----- .../IndexLifecycleServiceTests.java | 3 +++ 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index e92d2b95466d4..71784a8e9ebfd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -123,7 +123,7 @@ public void start(Collection jobs) { jobs.forEach(this::add); } - public synchronized void stop() { + public void stop() { scheduler.shutdownNow(); try { final boolean terminated = scheduler.awaitTermination(5L, TimeUnit.SECONDS); @@ -136,10 +136,6 @@ public synchronized void stop() { } } - public synchronized boolean isStopped() { - return scheduler.isShutdown(); - } - public void add(Job job) { ActiveSchedule schedule = new ActiveSchedule(job.getId(), job.getSchedule(), clock.millis()); schedules.compute(schedule.name, (name, previousSchedule) -> { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 8471ef9c07452..3eefb518fdd3a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.component.Lifecycle.State; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -154,12 +155,18 @@ SchedulerEngine.Job getScheduledJob() { return scheduledJob; } - private void maybeScheduleJob() { + private synchronized void maybeScheduleJob() { if (this.isMaster) { if (scheduler.get() == null) { - scheduler.set(new SchedulerEngine(settings, clock)); - scheduler.get().register(this); - } else if (scheduler.get().isStopped() == false) { + // don't create scheduler if the node is shutting down + if (isStoppedOrClosed(clusterService.lifecycleState()) == false) { + scheduler.set(new SchedulerEngine(settings, clock)); + scheduler.get().register(this); + } + } + + // scheduler could be null if the node might be shutting down + if (scheduler.get() != null) { scheduledJob = new SchedulerEngine.Job(XPackField.INDEX_LIFECYCLE, new TimeValueSchedule(pollInterval)); scheduler.get().add(scheduledJob); } @@ -251,7 +258,8 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) } @Override - public void close() { + public synchronized void close() { + assert isStoppedOrClosed(clusterService.lifecycleState()); SchedulerEngine engine = scheduler.get(); if (engine != null) { engine.stop(); @@ -262,4 +270,8 @@ public void submitOperationModeUpdate(OperationMode mode) { clusterService.submitStateUpdateTask("ilm_operation_mode_update", new OperationModeUpdateTask(mode)); } + + private static boolean isStoppedOrClosed(State state) { + return state == State.STOPPED || state == State.CLOSED; + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index 810f913775e3b..3eacf920d9118 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.component.Lifecycle.State; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -91,6 +92,7 @@ public void prepareServices() { Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build(); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, Collections.singleton(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING))); + when(clusterService.lifecycleState()).thenReturn(State.STARTED); Client client = mock(Client.class); AdminClient adminClient = mock(AdminClient.class); @@ -108,6 +110,7 @@ public void prepareServices() { @After public void cleanup() { + when(clusterService.lifecycleState()).thenReturn(State.STOPPED); indexLifecycleService.close(); threadPool.shutdownNow(); } From 5a8b3d06f66e7b7b439fadb02adf2954b2a757f3 Mon Sep 17 00:00:00 2001 From: jaymode Date: Thu, 21 Feb 2019 12:37:20 -0700 Subject: [PATCH 6/6] address new feedback --- .../indexlifecycle/IndexLifecycleService.java | 14 +++++++++++--- .../indexlifecycle/IndexLifecycleServiceTests.java | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 3eefb518fdd3a..34cdbb46a5c3a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -159,7 +159,7 @@ private synchronized void maybeScheduleJob() { if (this.isMaster) { if (scheduler.get() == null) { // don't create scheduler if the node is shutting down - if (isStoppedOrClosed(clusterService.lifecycleState()) == false) { + if (isClusterServiceStoppedOrClosed() == false) { scheduler.set(new SchedulerEngine(settings, clock)); scheduler.get().register(this); } @@ -259,7 +259,10 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) @Override public synchronized void close() { - assert isStoppedOrClosed(clusterService.lifecycleState()); + // this assertion is here to ensure that the check we use in maybeScheduleJob is accurate for detecting a shutdown in + // progress, which is that the cluster service is stopped and closed at some point prior to closing plugins + assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugin, which is expected to happen after " + + "the cluster service is stopped"; SchedulerEngine engine = scheduler.get(); if (engine != null) { engine.stop(); @@ -271,7 +274,12 @@ public void submitOperationModeUpdate(OperationMode mode) { new OperationModeUpdateTask(mode)); } - private static boolean isStoppedOrClosed(State state) { + /** + * Method that checks if the lifecycle state of the cluster service is stopped or closed. This + * enhances the readability of the code. + */ + private boolean isClusterServiceStoppedOrClosed() { + final State state = clusterService.lifecycleState(); return state == State.STOPPED || state == State.CLOSED; } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index 3eacf920d9118..67affe9f74ce0 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -110,7 +110,7 @@ public void prepareServices() { @After public void cleanup() { - when(clusterService.lifecycleState()).thenReturn(State.STOPPED); + when(clusterService.lifecycleState()).thenReturn(randomFrom(State.STOPPED, State.CLOSED)); indexLifecycleService.close(); threadPool.shutdownNow(); }